GEOG 489
Advanced Python Programming for GIS

A solution for implementing the conversion from sequential to multiprocessing

PrintPrint
# Setup _very_ simple timing. 
import time 
start_time = time.time() 
 
import arcpy 
from arcpy.sa import *
 
import os 
import multiprocessing 

arcpy.env.overwriteOutput = True
arcpy.env.workspace = r'C:\489\PSU_LiDAR'
 
## If our rasters aren't in our filter list then drop them from our list. 
def filter_list(fileList,filterList): 
    return[i for i in fileList if any(j in i for j in filterList)] 
  
def worker(raster): 
    ## Note also for performance we're not saving any of the intermediate rasters - they will exist only in memory 
    ## Fill the DEM to remove any sinks 
    # This is the initial workspace but to prevent any file locking we'll create a scratch workspace 
    # just for this raster 
    arcpy.env.scratchWorkspace = os.path.join(arcpy.env.workspace,'f'+raster.replace(".img",""))  # r'C:\Users\yourname\PSU_LiDAR\f'+raster.replace(".img","") 
    ## and we'll check if that scratch folder exists and if not we'll create it. 
    if not os.path.exists(arcpy.env.scratchWorkspace): 
        os.makedirs(arcpy.env.scratchWorkspace)    
    try: 
        FilledRaster = Fill(raster) 
        ## Calculate the Flow Direction (how water runs across the surface) 
        FlowDirRaster = FlowDirection(FilledRaster) 
        ## Calculate the Flow Accumulation (where the water accumulates in the surface) 
        FlowAccRaster = FlowAccumulation(FlowDirRaster) 
        ## Convert the Flow Accumulation to a Stream Network 
        ## We're setting an arbitrary threshold of 100 cells flowing into another cell to set it as part of our stream 
        ## http://pro.arcgis.com/en/pro-app/tool-reference/spatial-analyst/identifying-stream-networks.htm 
        Streams = Con(FlowAccRaster,1,"","Value > 100") 
        ## Convert the Raster Stream network to a feature class 
        output_Polyline = raster.replace(".img",".shp")         
        arcpy.CheckOutExtension("Spatial") 
        arcpy.sa.StreamToFeature(Streams,FlowDirRaster,output_Polyline) 
        arcpy.CheckInExtension("Spatial") 
    except: 
        print ("Errors occured") 
        print (arcpy.GetMessages()) 
        arcpy.AddMessage ("Errors occurred") 
        arcpy.AddMessage(arcpy.GetMessages()) 

def mp_handler():
 
    # Ordinarily we would want all of the rasters I'm filtering by a small set for testing & efficiency 
    # I did this by manually looking up the tile index for the LiDAR and determining an area of interest 
    # tiles ending in 227, 228, 230, 231, 232, 233, 235, 236 
    wildCardList = set(['227','228','230','231','232','233','235','236']) 
 
    # Get a list of rasters in my folder 
    rasters = arcpy.ListRasters("*") 
    new_rasters = filter_list(rasters,wildCardList) 
 
    # setup the multiprocessing pool with the number of cores in the current PC     
    with multiprocessing.Pool(multiprocessing.cpu_count()) as pool:
        # use the map function of pool to call the function worker() and pass it a raster 
        pool.map(worker,new_rasters)    
 
if __name__ == '__main__': 
    mp_handler()
    # Output how long the process took. 
    arcpy.AddMessage("--- %s seconds ---" % (time.time() - start_time)) 
    print ("--- %s seconds ---" % (time.time() - start_time)) 

The first thing to note here is that nothing much has changed at the beginning of the script until after the definition of the auxiliary filter_list() function except that we are now also importing multiprocessing as well as the os module which we will need later for setting up the mentioned scratch workspaces to store temporary raster files. 

The code for taking the name of a raster file and then performing the different raster operations that was using a for-loop in the sequential version has been moved to the worker function called worker(). This code function will only process a single raster file though and the name of that file needs to be passed to it as a paramter. One thing that has been added to this code is the creation of a separate scratch workspace folder for the given raster file to make sure that temporary raster files created by the different sub-processes executing the worker() function won't interfere with each other. For this, we first set the arcpy.env.scratchWorkspace variable to a folder name derived from the name of the input raster file in line 23 and then create that folder if it does not already exist in lines 25 and 26.

The mp_handler() function with the main code for setting up the tasks and running the multiprocessing pool contains the code for creating and filtering the list of raster file names. In line 60, we then create the multiprocessing pool in the same way as in the Cherry-O example except that we call the variable pool rather than myPool. Since our worker function also takes just one parameter, we again use pool.map(...) for populating the job pool. The first parameter is the name of our worker function, so worker() in this case. This followed by the list of parameters for the individual calls of that function, so here we can directly use variable new_rasters with the filtered list of raster file names. 

The worker() function does not return anything, so in contrast to the Cherry-O example we do not need to store a result list returned from pool.map(...) in a variable here and there is nothing else happening in the mp_handler() function. The code for calling mp_handler() again looks very much the same as in the Cherry-O example. 

Important: While we included calls of arcpy.AddMessage(...) in the code, you will most likely not be able to execute it as a script tool inside ArcGIS Pro but instead get error messages related to pickling. To resolve this you will have to move the worker() function to a separate Python file and then import this file from the main script file with the rest of the code. This will also involve copying some of the import statements to the second Python file. We are not going to show this here but we will demonstrate this in the next section with an example that processes vector data rather than rasters.

Back to Lesson Text