Published on NGA Advanced Python Programming for GIS, GLGI 3001-1 (https://www.e-education.psu.edu/ngapython)

Home > Lessons > Lesson 1: Advanced Python and Multiprocessing > Multiprocessing

Multiprocessing

Python includes several different syntaxial ways of executing processes in parallel. Each way comes with a list of pros and cons. The major hinderance with Python’s multithreading is the serialization and deserialization of data to and from the threads. This is called ‘pickling’ and will be discussed more in detail later in the section. It is important to note that custom classes, such as those found in arcpy (geoprocessing results, Featureclasses or layers) will need a custom serializer and de-serializer for geoprocessing results to be returned from threads. This takes significant work and coding to create and I have yet to see one.  Trying to return a object outside of the built in types will result in an Exception that the object cannot be pickled. The method of multiprocessing that we will be using utilizes the map method that we covered earlier in the lesson as a starmap(), or you can think of it as ‘start map’.  The method starts a new thread for each item in the list and holds the results from each process in a list.

What if you wanted to run different scripts at the same time? The starmap() method is great for a single process done i number of times but you can also be more explicit by using the pool.apply_async() method. Instead of using the map construct, you assign each process to a variable and then call .get() for the results. Note here that the parameters need to be passed as a tuple. Single params need to be passed as (arg,), but if you have more than one parameter to pass, the tuple is (arg1, arg2, arg3).

For example:

with mp.Pool(processes=5) as pool: 
    p1 = pool.apply_async(scriptA, (1param,)) 
    p2 = pool.apply_async(scriptB, (1param, 2param)) 
    p3 = pool.apply_async(scriptC, (1param,)) 
 
    res = [p1.get(), p2.get(), p3.get(), …] 

First steps with multiprocessing

You might have realized that there are generally two broad types of tasks – those that are input/output (I/O) heavy which require a lot of data to be read, written or otherwise moved around; and those that are CPU (or processor) heavy that require a lot of calculations to be done. Because getting data is the slowest part of our operation, I/O heavy tasks do not demonstrate the same improvement in performance from multiprocessing as CPU heavy tasks. The more work there is to do for the CPU the greater the benefit in splitting that workload among a range of processors so that they can share the load.

The other thing that can slow us down is outputting to the screen – although this isn’t really an issue in multiprocessing because printing to our output window can get messy. Think about two print statements executing at exactly the same time – you’re likely to get the content of both intermingled, leading to a very difficult to understand message. Even so, updating the screen with print statements is a slow task.

Don’t believe me? Try this sample piece of code that sums the numbers from 0-100.

import time 
 
start_time = time.time() 
 
sum = 0 
for i in range(0, 100): 
    sum += i 
    print(sum) 
 
# Output how long the process took.  
print("--- %s seconds ---" % (time.time() - start_time))   

If I run it with the print function in the loop the code takes 0.049 seconds to run on my PC. If I comment that print function out, the code runs in 0.0009 seconds.

4278
4371
4465
4560
4656
4753
4851
4950
--- 0.04900026321411133 seconds ---

runfile('C:/Users/jao160/Documents/Teaching_PSU/Geog489_SU_21/Lesson 1/untitled1.py', wdir='C:/Users/jao160/Documents/Teaching_PSU/Geog489_SU_21/Lesson 1')
--- 0.0009996891021728516 seconds ---

You might remember a similar situation in GEOG 485 with the Hi-ho Cherry-O example [1] where we simulated 10,000 runs of this children's game to determine the average number of turns it takes. If we printed out the results, the code took a minute or more to run. If we skipped all but the final print statement the code ran in less than a second.

We’ll revisit that Cherry-O example as we experiment with moving code from the single processor paradigm to multiprocessor. We’ll start with it as a simple, non arcpy example and then move on to two arcpy examples – one raster (our raster calculation example from before) and one vector.

Here’s our original Cherry-O code. (If you did not take GEOG485 and don't know the game, you may want to have a quick look at the description from GEOG485 [1]).

# Simulates 10K game of Hi Ho! Cherry-O  
# Setup _very_ simple timing.  
import time 
 
start_time = time.time() 
import random 
 
spinnerChoices = [-1, -2, -3, -4, 2, 2, 10] 
turns = 0 
totalTurns = 0 
cherriesOnTree = 10 
games = 0 
 
while games < 10001: 
    # Take a turn as long as you have more than 0 cherries  
    cherriesOnTree = 10 
    turns = 0 
    while cherriesOnTree > 0: 
        # Spin the spinner  
        spinIndex = random.randrange(0, 7) 
        spinResult = spinnerChoices[spinIndex] 
        # Print the spin result      
        # print ("You spun " + str(spinResult) + ".")     
        # Add or remove cherries based on the result  
        cherriesOnTree += spinResult 
 
        # Make sure the number of cherries is between 0 and 10     
        if cherriesOnTree > 10: 
            cherriesOnTree = 10 
        elif cherriesOnTree < 0: 
            cherriesOnTree = 0 
            # Print the number of cherries on the tree         
        # print ("You have " + str(cherriesOnTree) + " cherries on your tree.")      
        turns += 1 
    # Print the number of turns it took to win the game  
    # print ("It took you " + str(turns) + " turns to win the game.")  
    games += 1 
    totalTurns += turns 
print("totalTurns " + str(float(totalTurns) / games)) 
# lastline = raw_input(">")  
# Output how long the process took.  
print("--- %s seconds ---" % (time.time() - start_time))  

We've added in our very simple timing from earlier and this example runs for me in about 1/3 of a second (without the intermediate print functions). That is reasonably fast and you might think we won't see a significant improvement from modifying the code to use multiprocessor mode but let's experiment. 

The Cherry-O task is a good example of a CPU bound task; we’re limited only by the calculation speed of our random numbers, as there is no I/O being performed. It is also an embarrassingly parallel task as none of the 10,000 runs of the game are dependent on each other. All we need to know is the average number of turns; there is no need to share any other information. Our logic here could be to have a function (Cherry-O) which plays the game and returns to our calling function the number of turns. We can add that value returned to a variable in the calling function and when we’re done divide by the number of games (e.g. 10,000) and we’ll have our average. 

Lesson content developed by Jan Wallgrun and James O’Brien

Converting from sequential to multiprocessing

So with that in mind, let us examine how we can convert a simple program like a programmatic version of the game Hi Ho Cherry-O from sequential to multiprocessing.

You can download the Hi Ho Cherry-O script [2].

There are a couple of basic steps we need to add to our code in order to support multiprocessing. The first is that our code needs to import multiprocessing which is a Python library which as you will have guessed from the name enables multiprocessing support. We’ll add that as the first line of our code.

The second thing our code needs to have is a __main__ method defined. We’ll add that into our code at the very bottom with:

if __name__ == '__main__': 
    play_a_game()

With this, we make sure that the code in the body of the if-statement is only executed for the main process we start by running our script file in Python, not the subprocesses we will create when using multiprocessing, which also are loading this file. Otherwise, this would result in an infinite creation of subprocesses, subsubprocesses, and so on. Next, we need to have that play_a_game() function we are calling defined. This is the function that will set up our pool of processors and also assign (map) each of our tasks onto a worker (usually a processor) in that pool.

Our play_a_game() function is very simple. It has two main lines of code based on the multiprocessing module:

The first instantiates a pool with a number of workers (usually our number of processors or a number slightly less than our number of processors). There’s a function to determine how many processors we have, multiprocessing.cpu_count(), so that our code can take full advantage of whichever machine it is running on. That first line is:

with multiprocessing.Pool(multiprocessing.cpu_count()) as myPool:
   ... # code for setting up the pool of jobs

You have probably already seen this notation from working with arcpy cursors. This with ... as ... statement creates an object of the Pool class defined in the multiprocessing module and assigns it to variable myPool. The parameter given to it is the number of processors on my machine (which is the value that multiprocessing.cpu_count() is returning), so here we are making sure that all processor cores will be used. All code that uses the variable myPool (e.g., for setting up the pool of multiprocessing jobs) now needs to be indented relative to the "with" and the construct makes sure that everything is cleaned up afterwards. The same could be achieved with the following lines of code:

myPool = multiprocessing.Pool(multiprocessing.cpu_count())
... # code for setting up the pool of jobs
myPool.close()
myPool.join()

Here the Pool variable is created without the with ... as ... statement. As a result, the statements in the last two lines are needed for telling Python that we are done adding jobs to the pool and for cleaning up all sub-processes when we are done to free up resources. We prefer to use the version using the with ... as ... construct in this course.

The next line that we need in our code after the with ... as ... line is for adding tasks (also called jobs) to that pool:

    res = myPool.map(hi_ho_cherry_o, range(10000))

What we have here is the name of another function, hi_ho_cherry_o(), which is going to be doing the work of running a single game and returning the number of turns as the result. The second parameter given to map() contains the parameters that should be given to the calls of thecherryO() function as a simple list. So this is how we are passing data to process to the worker function in a multiprocessing application. In this case, the worker function hi_ho_cherry_o() does not really need any input data to work with. What we are providing is simply the number of the game this call of the function is for, so we use the range from 0-9,999 for this. That means we will have to introduce a parameter into the definiton of the hi_ho_cherry_o() function for playing a single game. While the function will not make any use of this parameter, the number of elements in the list (10000 in this case) will determine how many times hi_ho_cherry_o() will be run in our multiprocessing pool and, hence, how many games will be played to determine the average number of turns. In the final version, we will replace the hard-coded number by an argument called numGames. Later in this part of the lesson, we will show you how you can use a different function called starmap(...) instead of map(...) that works for worker functions that do take more than one argument so that we can pass different parameters to it.

Python will now run the pool of calls of the hi_ho_cherry_o() worker function by distributing them over the number of cores that we provided when creating the Pool object. The returned results, so the number of turns for each game played, will be collected in a single list and we store this list in variable res. We’ll average those turns per game to get an average using the Python library statistics and the function mean().

To prepare for the multiprocessing version, we’ll take our Cherry-O code from before and make a couple of small changes. We’ll define function hi_ho_cherry_o() around this code (taking the game number as parameter as explained above) and we’ll remove the while loop that currently executes the code 10,000 times (our map range above will take care of that) and we’ll therefore need to “dedent“ the code.

Here’s what our revised function will look like :

def hi_ho_cherry_o(game): 
    spinnerChoices = [-1, -2, -3, -4, 2, 2, 10] 
    turns = 0 
    cherriesOnTree = 10 
 
    # Take a turn as long as you have more than 0 cherries  
    while cherriesOnTree > 0: 
        # Spin the spinner  
        spinIndex = random.randrange(0, 7) 
        spinResult = spinnerChoices[spinIndex] 
        # Print the spin result      
        # print ("You spun " + str(spinResult) + ".")  
        # Add or remove cherries based on the result  
        cherriesOnTree += spinResult 
        # Make sure the number of cherries is between 0 and 10     
        if cherriesOnTree > 10: 
            cherriesOnTree = 10 
        elif cherriesOnTree < 0: 
            cherriesOnTree = 0 
            # Print the number of cherries on the tree         
        # print ("You have " + str(cherriesOnTree) + " cherries on your tree.")  
        turns += 1 
    # return the number of turns it took to win the game  
    return turns   

Now lets put it all together. We’ve made a couple of other changes to our code to define a variable at the very top called numGames = 10000 to define the size of our range.

import random
import multiprocessing
import statistics
import time

def hi_ho_cherry_o(game):
    spinnerChoices = [-1, -2, -3, -4, 2, 2, 10]
    turns = 0
    cherriesOnTree = 10

    # Take a turn as long as you have more than 0 cherries
    while cherriesOnTree > 0:
        # Spin the spinner
        spinIndex = random.randrange(0, 7)
        spinResult = spinnerChoices[spinIndex]
        # Print the spin result
        # print ("You spun " + str(spinResult) + ".")
        # Add or remove cherries based on the result
        cherriesOnTree += spinResult
        # Make sure the number of cherries is between 0 and 10
        if cherriesOnTree > 10:
            cherriesOnTree = 10
        elif cherriesOnTree < 0:
            cherriesOnTree = 0
            # Print the number of cherries on the tree
        # print ("You have " + str(cherriesOnTree) + " cherries on your tree.")
        turns += 1
        # return the number of turns it took to win the game
    return turns


def play_a_game(numGames):
    with multiprocessing.Pool(multiprocessing.cpu_count()) as myPool:
        ## The Map function part of the MapReduce is on the right of the = and the Reduce part on the left where we are aggregating the results to a list.
        turns = myPool.map(hi_ho_cherry_o, range(numGames))
        # Uncomment this line to print out the list of total turns (but note this will slow down your code's execution)
        # print(turns)
    # Use the statistics library function mean() to calculate the mean of turns
    print(f'Average turns for {len(turns)} games is {statistics.mean(turns)}')


if __name__ == '__main__':
    start_time = time.time()
    play_a_game(10000)
    # Output how long the process took.
    print(f" Process took {time.time() - start_time} seconds")

You will also see that we have the list of results returned on the left side of the = before our map function (~line 35). We’re taking all of the returned results and putting them into a list called turns (feel free to add a print or type statement here to check that it's a list). Once all of the workers have finished playing the games, we will use the Python library statistics function mean, which we imported at the very top of our code (right after multiprocessing) to calculate the mean of our list in variable turns. The call to mean() will act as our reduce as it takes our list and returns the single value that we're really interested in.

When you have finished writing the code in PyScripter, you can run it.

Lesson content developed by Jan Wallgrun and James O’Brien

Arcpy multiprocessing examples

Now that we have completed a non-ArcGIS parallel processing exercise, let's look at a couple of examples using ArcGIS functions. There are several caveats or gotchas to using multiprocessing with ArcGIS and it is important to cover them up-front because they affect the ways in which we can write our code. 

Esri describes several best practices for multiprocessing with arcpy. These include: 

  • Use “memory“ workspaces to store temporary results because as noted earlier memory is faster than disk. 
  • Avoid writing to file geodatabase (FGDB) data types and GRID raster data types. These data formats can often cause schema locking or synchronization issues. That is because file geodatabases and GRID raster types do not support concurrent writing – that is, only one process can write to them at a time. You might have seen a version of this problem in arcpy previously if you tried to modify a feature class in Python that was open in ArcGIS. That problem is magnified if you have an FGDB and you’re trying to write many feature classes to it at once. Even if all of the feature classes are independent you can only write them to the FGDB one at a time. 
  • Use 64-bit. This isn’t an issue if we are writing code in ArcGIS Pro (although Esri does recommend using a version of Pro greater than 1.4) because we are already using 64-bit, but if you were planning on using Desktop as well, then you would need to use ArcGIS Server 10.5 (or greater) or ArcGIS Desktop with Background Geoprocessing (64-bit). The reason for this is that as we previously noted 64-bit processes can access significantly more memory and using 64-bit might help resolve any large data issues that don’t fit within the 32-bit memory limits of 4GB. 

So bearing the top two points in mind we should make use of memory workspaces wherever possible and we should avoid writing to FGDBs (in our worker functions at least – but we could use them in our master function to merge a number of shapefiles or even individual FGDBs back into a single source). 

Since we work with other packages such as arcpy, it is important to note that Classes within arcpy such as the Featureclass, Layer, Table, Raster, etc.,. cannot be returned from the worker threads without creating custom serializers to serialize and deserialize the objects between threads. This is known as Pickling and is the process of converting the object to JSON and back to an object. This method is beyond the scope of this course but built in classes and objects within python can be returned. For our example, we will return a dictionary containing information of the process.  

Lesson content developed by Jan Wallgrun and James O’Brien

Multiprocessing with raster data

There are two types of operations with rasters that can easily (and productively) be implemented in parallel: operations that are independent components in a workflow, and raster operations which are local, focal or zonal – that is they work on a small portion of a raster such as a pixel or a group of pixels.

Esri’s Clinton Dow and Neeraj Rajasekar presented way back at the 2017 User Conference demonstrating multiprocessing with arcpy and they had a number of useful graphics in their slides which demonstrate these two categories of raster operations which we have reproduced here as they're still appropriate and relevant.

An example of an independent workflow would be if we calculate the slope, aspect and some other operations on a raster and then produce a weighted sum or other statistics. Each of the operations is independently performed on our raster up until the final operation which relies on each of them (see the first image below). Therefore, the independent operations can be parallelized and sent to a worker and the final task (which could also be done by a worker) aggregates or summarises the result. Which is what we can see in the second image as each of the tasks is assigned to a worker (even though two of the workers are using a common dataset) and then Worker 4 completes the task. You can probably imagine a more complex version of this task where it is scaled up to process many elevation and land-use rasters to perform many slope, aspect and reclassification calculations with the results being combined at the end.

parallel problem slide see text description below
Figure 1.17 Slide 15 from Parallel Python
Click for a text description
Slide titled: Pleasingly parallel problems. Slide shows serialized execution of a model workflow as worker 1 does 3 steps sequentially which feed into the final worker 1 which does weighted sum leading to an output suitability raster. The 3 original processes completed by work one are: First, elevation raster to slope, then, elevation raster to aspect, and finally, Land use raster to reclassify. The slide then asks at the bottom, why is multiprocessing relevant to geoprocessing workflows?
Credit: Esri GIS Co., permission requested for use
parallel problem slide see text description below
Figure 1.18 Slide 16 from Parallel Python
Click for a text description
Slide titled: Pleasingly parallel problems. Slide shows parallelized execution of a model workflow as three different workers simultaneously feed into a fourth worker which does weighted sum leading to an output suitability raster. Worker 1 processes elevation raster to slope, worker 2 processes elevation raster to aspect, and worker 3 processes land use raster to reclassify. The slide then asks at the bottom, why is multiprocessing relevant to geoprocessing workflows?
Credit: Esri GIS Co., permission requested for use

An example of the second type of raster operation is a case where we want to make a mathematical calculation on every pixel in a raster such as squaring or taking the square root. Each pixel in a raster is independent of its neighbors in this operation so we could have multiple workers processing multiple tiles in the raster and the result is written to a new raster. In this example, instead of having a single core serially performing a square root calculation across a raster (the first image below) we can segment our raster into a number of tiles, assign each tile to a worker and then perform the square root operation for each pixel in the tile outputting the result to a single raster which is shown in the second image below.

parallel problem slide see text description below
Figure 1.19 Slide 19 from Parallel Python
Click for a text description
Slide titled: Pleasingly parallel problems. Slide shows tool executed serially on a large input dataset. Starts with large elevation raster leading to worker 1 leading to square root math tool and finally output square root raster. The slide then asks at the bottom, why is multiprocessing relevant to geoprocessing workflows?
Credit: Esri GIS Co., permission requested for use
parallel problem slide see text description below
Figure 1.20 Slide 20 from Parallel Python
Click for a text description
Slide titled: Pleasingly parallel problems. Slide shows tool executed parallelly on a large input dataset. Starts with large elevation raster leading to four different workers identically using the square root math tool and all leading to the same output square root raster. The slide then asks at the bottom, why is multiprocessing relevant to geoprocessing workflows?
Credit: Esri GIS Co., permission requested for use

Bearing in mind the caveats about parallel programming from above and the process that we undertook to convert the Hi Ho Cherry-O program, let's begin.

The DEM that we will be using can be downloaded [3] and the sample code is below that we want to conver it is below.

# This script uses map algebra to find values in an 
#  elevation raster greater than 3500 (meters). 
 
import arcpy 
from arcpy.sa import *
 
# Specify the input raster 
inRaster = arcpy.GetParameterAsText(0)
cutoffElevation = arcpy.GetParameter(1)
outPath = arcpy.env.workspace
 
# Check out the Spatial Analyst extension 
arcpy.CheckOutExtension("Spatial") 
 
# Make a map algebra expression and save the resulting raster 
outRaster = Raster(inRaster) > cutoffElevation 
outRaster.save(outPath+"/foxlake_hi_10")
 
# Check in the Spatial Analyst extension now that you're done 
arcpy.CheckInExtension("Spatial")

Our first task is to identify the parts of our problem that can work in parallel and the parts which we need to run sequentially.

The best place to start with this can be with the pseudocode of the original task. If we have documented our sequential code well, this could be as simple as copying/pasting each line of documentation into a new file and working through the process. We can start with the text description of the problem and build our sequential pseudocode from there and then create the multiprocessing pseudocode. It is very important to correctly and carefully design our multiprocessing solutions to ensure that they are as efficient as possible and that the worker functions have the bare minimum of data that they need to complete the tasks, use memory workspaces, and write as little data back to disk as possible.

Our original task was :

Get a list of raster tiles  
For every tile in the list: 
     Fill the DEM 
     Create a slope raster  
     Calculate a flow direction raster 
     Calculate a flow accumulation raster  
     Convert those stream rasters to polygon or polyline feature classes.  

You will notice that I’ve formatted the pseudocode just like Python code with indentations showing which instructions are within the loop.

As this is a simple example we can place all of the functionality within the loop into our worker function as it will be called for every raster. The list of rasters will need to be determined sequentially and we’ll then pass that to our multiprocessing function and let the map element of multiprocessing map each raster onto a worker to perform the tasks. We won’t explicitly be using the reduce part of multiprocessing here as the output will be a featureclass but reduce will probably tidy up after us by deleting temporary files that we don’t need.

Our new pseudocode then will look like :

Get a list of raster tiles  
For every tile in the list: 
    Launch a worker function with the name of a raster 
Worker:
Fill the DEM 
     Create a slope raster  
     Calculate a flow direction raster 
     Calculate a flow accumulation raster  
     Convert those stream rasters to polygon or polyline feature classes.  

Bear in mind that not all multiprocessing conversions are this simple. We need to remember that user output can be complicated because multiple workers might be attempting to write messages to our screen at once and that can cause those messages to get garbled and confused. A workaround for this problem is to use Python’s logging library which is much better at handling messages than us manually using print statements. We haven't implemented logging in this sample solution for this script but feel free to briefly investigate it to supplement the print and arcpy.AddMessage functions with calls to the logging function. The Python Logging Cookbook [4] has some helpful examples.

As an exercise, attempt to implement the conversion from sequential to multiprocessing. You will probably not get everything right since there are a few details that need to be taken into account such as setting up an individual scratch workspace for each call of the worker function. In addition, to be able to run as a script tool the script needs to be separated into two files with the worker function in its own file. But don't worry about these things, just try to set up the overall structure in the same way as in the Hi Ho Cherry-O multiprocessing version and then place the code from the sequential version of the raster example either in the main function or worker function depending on where you think it needs to go. Then check out the solution linked below.

Click here for one way of implementing the solution [5]

When you run this code, do you notice any performance differences between the sequential and multiprocessor versions?

The sequential version took 96 seconds on the same 4-processor PC we were using in the Cherry-O example, while the multiprocessor version completed in 58 seconds. Again not 4 times faster as we might expect but nearly twice as fast with multiprocessing is a good improvement. For reference, the 32-processor PC from the Cherry-O example processed the sequential code in 110 seconds and the multiprocessing version in 40 seconds. We will look in more detail at the individual lines of code and their performance when we examine code profiling but you might also find it useful to watch the CPU usage tab in Task Manager to see how hard (or not) your PC is working.

Lesson content developed by Jan Wallgrun and James O’Brien

Multiprocessing with vector data

The best practices of multiprocessing that we introduced earlier are even more important when we are working with vector data than they are with raster data. The geodatabase locking issue is likely to become much more of a factor as typically we use more vector data than raster and often geodatabases are used more with feature classes.

The example we’re going to use here involves clipping a feature layer by polygons in another feature layer. A sample use case of this might be if you need to segment one or several infrastructure layers by state or county (or even a smaller subdivision). If I want to provide each state or county with a version of the roads, sewer, water or electricity layers (for example) this would be a helpful script. To test out the code in this section (and also the first homework assignment), you can again use the data from the USA.gdb geodatabase (Section 1.5) we provided. The application then is to clip the data from the roads, cities, or hydrology data sets to the individual state polygons from the States data set in the geodatabase. 

To achieve this task, one could run the Clip tool manually in ArcGIS Pro but if there are a lot of polygons in the clip data set, it will be more effective to write a script that performs the task. As each state/county is unrelated to the others, this is an example of an operation that can be run in parallel.

The code example below has been adapted from a code example written by Duncan Hornby at the University of Southampton in the United Kingdom that has been used to demonstrate multiprocessing and also how to create a script tool that supports multiprocessing. We will take advantage of Mr. Hornby’s efforts and make use of his code (with attribution of course) but we have also reorganized and simplified it quite a bit and added some enhancements.

Let us examine the code’s logic and then we’ll dig into the syntax.

The code has two Python files [6]. This is important because when we want to be able to run it as a script tool in ArcGIS, it is required that the worker function for running the individual tasks be defined in its own module file, not in the main script file for the script tool with the multiprocessing code that calls the worker function. The first file called scripttool.py imports arcpy, multiprocessing, and the worker code contained in the second python file called multicode.py and it contains the definition of the main function mp_handler() responsible for managing the multiprocessing operations similar to the hi_ho_cherry_o multiprocessing version. It uses two script tool parameters, the file containing the polygons to use for clipping (variable clipper) and the file to be clipped (variable tobeclipped). The main function mp_handler() calls the worker(...) function located in the multicode file, passing it the files to be used and other information needed to perform the clipping operation. This will be further explained below . The code for the first file including the main function is shown below.

import arcpy
import multiprocessing as mp
from WorkerScript import clipper

# Input parameters
clipping_fc = arcpy.GetParameterAsText(0) if arcpy.GetParameterAsText(0) else r"C:\489\USA.gdb\States"
data_to_be_clipped = arcpy.GetParameterAsText(1) if arcpy.GetParameterAsText(1) else r"C:\489\USA.gdb\Roads"

def mp_handler():
    try:
        # Create a list of object IDs for clipper polygons
        arcpy.AddMessage("Creating Polygon OID list...")
        clipperDescObj = arcpy.Describe(clipping_fc)
        field = clipperDescObj.OIDFieldName

        # Create the idList by list comprehension and SearchCursor
        idList = [row[0] for row in arcpy.da.SearchCursor(clipping_fc, [field])]

        arcpy.AddMessage(f"There are {len(idList)} object IDs (polygons) to process.")

        # Create a task list with parameter tuples for each call of the worker function. Tuples consist of the clippper, tobeclipped, field, and oid values.
        # adds tuples of the parameters that need to be given to the worker function to the jobs list
        # using list comprehension
        jobs = [(clipping_fc, data_to_be_clipped, field, id) for id in idList]

        arcpy.AddMessage(f"Job list has {len(jobs)} elements.\n Sending to pool")

        cpuNum = mp.cpu_count()  # determine number of cores to use
        arcpy.AddMessage(f"There are: {cpuNum} cpu cores on this machine")

        # Create and run multiprocessing pool.
        with mp.Pool(processes=cpuNum) as pool:  # Create the pool object
            # run jobs in job list; res is a list with return dictionary values from the worker function
            res = pool.starmap(clipper, jobs)

        # After the threads are complete, iterate over the results and check for errors.
        for r in res:
            if r['errorMsg'] != None:
                arcpy.AddError(f'Task {r["name"]} Failed with: {r["errorMsg"]}')

        arcpy.AddMessage("Finished multiprocessing!")

    except Exception as ex:
        arcpy.AddError(ex)


if __name__ == '__main__':
    mp_handler()

Let's now have a close look at the logic of the two main functions which will do the work. The first one is the mp_handler() function shown in the code section above. It takes the input variables and has the job of processing the polygons in the clipping file to get a list of their unique IDs, building a job list of parameter tuples that will be given to the individual calls of the worker function, setting up the multiprocessing pool and running it, and taking care of error handling.

The second function is the worker function called by the pool (named worker in this example) located in the WorkerScript.py file (code shown below). This function takes the name of the clipping feature layer, the name of the layer to be clipped, the name of the field that contains the unique IDs of the polygons in the clipping feature layer, and the feature ID identifying the particular polygon to use for the clipping as parameters. This function will be called from the pool constructed in mp_handler().

The worker function will then make a selection from the clipping layer. This has to happen in the worker function because all parameters given to that function in a multiprocessing scenario need to be of a simple type that can be "pickled." Pickling data [7] means converting it to a byte-stream which in the simplest terms means that the data is converted to a sequence of simple Python types (string, number etc.).  As feature classes are much more complicated than that containing spatial and non-spatial data, they cannot be readily converted to a simple type. That means feature classes cannot be "pickled" and any selections that might have been made in the calling function are not shared with the worker functions. Therefore, we need to think about creative ways of getting our data shared with our sub-processes. In this case, that means we’re not going to do the selection in the master module and pass the polygon to the worker module. Instead, we’re going to create a list of feature IDs that we want to process and we’ll pass an ID from that list as a parameter with each call of the worker function that can then do the selection with that ID on its own before performing the clipping operation. For this, the worker function selects the polygon matching the OID field parameter when creating a layer with MakeFeatureLayer_management() and uses this selection to clip the feature layer to be clipped. The results are saved in a shapefile including the OID in the file's name to ensure that the names are unique.

def clipper(clipper, tobeclipped, field, oid):
    """
       This is the function that gets called and does the work of clipping the input feature class to one of the
       polygons from the clipper feature class. Note that this function does not try to write to arcpy.AddMessage() as
       nothing is ever displayed.
       param: clipper
       param: tobeclipped
       param: field
       param: oid
    """
    # Create result dictionary that will be exclusive to the thread if ran in parallel.
    result_dict = {'name': None, 'errorMsg': None}
    try:
        # Set the name of the clipped to the result dictionary name
        result_dict['name'] = tobeclipped

        # Create a layer with only the polygon with ID oid. Each clipper layer needs a unique name, so we include oid in the layer name.
        query = f"{field} = {oid}"
        tmp_flayer = arcpy.MakeFeatureLayer_management(clipper, f"clipper_{oid}", query)

        # Do the clip. We include the oid in the name of the output feature class.
        outFC = fr"C:\NGA\Lesson 1 Data\output\clip_{oid}.shp"
        arcpy.Clip_analysis(tobeclipped, tmp_flayer, outFC)

        print(f"finished clipping: {oid}")
        return result_dict  # everything went well so we return the dictionary

    except Exception as ex:
        result_dict['errorMsg'] = ex
        # Some error occurred so return the exception thrown.
        print(f"error condition: {ex}")
        return result_dict

Having covered the logic of the code, let's review the specific syntax used to make it all work. While you’re reading this, try visualizing how this code might run sequentially first – that is one polygon being used to clip the to-be-clipped feature class, then another polygon being used to clip the to-be-clipped feature class and so on (maybe through 4 or 5 iterations). Then once you have an understanding of how the code is running sequentially try to visualize how it might run in parallel with the worker function being called 4 times simultaneously and each worker performing its task independently of the other workers.

We’ll start with exploring the syntax within the mp_handler(...) function.

The mp_handler(...) function begins by determining the name of the field that contains the unique IDs of the clipper feature class using the arcpy.Describe(...) function (line 13). The code then uses a Search Cursor to get a list of all of the object (feature) IDs from within the clipper polygon feature class (line 17). This gives us a list of IDs that we can pass to our worker function along with the other parameters. As a check, the length of that list is printed out (line 26).

Next, we create the job list with one entry for each call of the clipper() function we want to make (line 24). Each element in this list is a tuple of the parameters that should be given to that particular call of clipper(). This list will be required when we set up the pool by calling pool.starmap(...). To construct the list, we simply loop through the ID list and append a parameter tuple to the list in variable jobs. The first three parameters will always be the same for all tuples in the job list; only the polygon ID will be different. In the homework assignment for this lesson, you will adapt this code to work with multiple input files to be clipped. As a result, the parameter tuples will vary in both the values for the oid parameter and for the tobeclipped parameter.

To prepare the multiprocessing pool, we start it using the with statement. The code then sets up the size of the pool using the maximum number of processors in line 28 (as we have done in previous examples) and then, using the starmap() method of Pool, calls the worker function clipper(...) once for each parameter tuple in the jobs list (line 34). 

Any outputs from the worker function will be stored in a list of results dictionaries. These are values returned by the clipper() function. The results of each process is iterated over and checked for the Exceptions by checking if the r[‘errorMsg’] key holds a value other than None. 

Let's now look at the code in our worker function clipper(...). As we noted in the logic section above, it receives four parameters: the full paths of the clipping and to-be-clipped feature classes, the name of the field that contains the unique IDs in the clipper feature class, and the OID of the polygon it is to use for the clipping.

We create a results dictionary to help with returning valuable information back to the main processes. The 'name' key will be set to the tobeclipped parameter in line 15. The errorMsg is set to None to indicate that everything went ok as default (line 12 of the clipper function) and would be set to an Exception message to indicate that the operation failed (line 29). In the main function, the results are iterated over to print out any error messages that were encountered during the  clipping process.

Notice that the MakeFeatureLayer_management(...) function in line 19 is used to create an memory layer which is a copy of the original clipper layer. This use of the memory layer is important in three ways: The first is performance – memory layers are faster; second, the use of an memory layer can help prevent any chance of file locking (although not if we were writing back to the file); third, selection only works on layers so even if we wanted to, we couldn’t get away without creating this layer.

The call of MakeFeatureLayer_management(...) also includes an SQL query string defined one line earlier in line 11 to create the layer with just the polygon that matches the oid that was passed as a parameter. The name of the layer we are producing here should be unique; this is why we’re adding str(oid) to the name in the first parameter.

Now with our selection held in our memory, uniquely named feature layer, we perform the clip against our to-be-clipped layer (line 16) and store the result in outFC which we define in line 15 to be a hardcoded folder with a unique name starting with "clip_" followed by the oid. To run the code, you will most likely have to adapt the path used in variable outFC.

The process then returns from the worker function and will be supplied with another oid. This will repeat until a call has been made for each polygon in the clipping feature class.

We are going to use this code as the basis for our Lesson 1 homework project. Have a look at the Assignment Page for full details.

You can test this code out by running it in a number of ways. If you run it from ArcGIS Pro as a script tool, you will have to swap the hashmarks for the clipper and tobeclipped input variables so that GetParameterAsText() is called instead of using hardcoded paths and file names. Be sure to set your parameter type for both parameters to Feature Class. If you make changes to the code and have problems with the changes to the code not being reflected in Pro, delete your Script tool from the Toolbox, restart Pro and re-add the script tool. 

You can run your code from Pyscripter as a stand alone script. Make sure you're running the scripttool.py in PyScripter (not the multicode.py). You can also run your code from the Command Prompt which is the fastest way with the smallest resource overhead.

The final thing to remember about this code is that it has a hardcoded output path defined in variable outFC in the worker() function - which you will want to change, create and/or parameterize etc. so that you have some output to investigate. If you do none of these things then no output will be created.

When the code runs it will create a shapefile for every unique object identifier in the "clipper" shapefile (there are 51 in the States data set from the sample data) named using the OID (that is clip_1.shp - clip_59.shp).

Lesson content developed by Jan Wallgrun and James O’Brien


Source URL:https://www.e-education.psu.edu/ngapython/node/835

Links
[1] https://www.e-education.psu.edu/geog485/node/242 [2] https://www.e-education.psu.edu/ngapython/sites/www.e-education.psu.edu.ngapython/files/Lesson_01/script_files/CherryO.zip [3] https://www.e-education.psu.edu/ngapython/sites/www.e-education.psu.edu.ngapython/files/Lesson_01/files/FoxLake.zip [4] https://docs.python.org/3/howto/logging-cookbook.html [5] https://www.e-education.psu.edu/ngapython/node/846 [6] https://www.e-education.psu.edu/ngapython/sites/www.e-education.psu.edu.ngapython/files/Lesson_01/files/multiprocessing_script.zip [7] https://www.datacamp.com/community/tutorials/pickle-python-tutorial