4 Chaine de traitement

Pour ce TP les données initiales sont accessibles depuis un espace de stockage partagé associé au projet sur le Cluster. Le chemin de data\_folder est /home/2025021/PARTAGE/data_folder. Pensez bien à changer ce Path dans les scripts et n’hésitez pas à vous rendre dans les dossiers en utilisant le terminal disponible sous Jupyter.

4.1 Environnement R

4.1.1 Présentation des bibliothèques utilisées

Les librairies principales sont les suivantes :

  • terra : bibliothèque pour gérer des rasters
  • targets : permet de construire des workflow qui peuvent ensuite être distribué sur des environnements distribués grâce à …
  • crew : qui offre les interfaces de connexion nécessaire pour différents environnements, local évidemment mais aussi distant HPC comme SLURM, SGE, etc.
  • geotargets : une librairie qui étend les capacités de Targets et le rend plus capable de gérer les fichiers spatiaux
  • tarchetypes : une librairie qui permet d’écrire des workflows plus lisible que la syntaxe Targets

4.1.2 Exécution de la Chaîne

Targets nécessite une organisation précise pour fonctionner. Le workflow doit obligatoirement être décrit dans un fichier _targets.R pour que la fonction de contruction tar_make fonctionne. C’est le point d’entrée pour l’execution du workflow.

Pour le moment nous créons ce fichier sans le remplir, nous y reviendrons ensuite.

4.1.2.1 Importation des bibliothèques

Nous avons fait le choix de lister les packages dans un fichier packages.R, qui sont ensuite chargés depuis _targers.R

Vous pouvez ajouter la ligne source("./packages.R") dans _targets.R, au tout début.

4.1.2.2 Définition des fonctions utilitaires

Nous regroupons ensuite toutes nos fonctions utilitaires dans un répertoire R_Script, qui lui aussi sera sourcé depuis le début du fichier _targets.R

Celles-ci sont décrite dans la partie parallélisation en même temps que le Workflow Targets, elles sont regroupés dans un fichier R_Scripts/utils.R.

  • find_years(data_folder) : renvoie une liste des années dans le dossier data_folder
  • find_classification_tiles : construit une liste des chemins vers les rasters classifications et confiance.
  • load_raster charge et renvoie les deux fichiers raster classification et confiance avec la fonction rast() de Terra.

4.1.3 Exécution de la Chaîne

L’execution de la chaîne se fait au travers du fichier _targets.R, c’est pourquoi tous les scripts à suivre sont regroupés dans un dossier R_Scripts :

  • utils.R
  • by_tile_pipeline.R
  • pipeline_def.R
  • pipeline_steps.R

Ces scripts sont sourcés dans _targets.R via les lignes :

source("./packages.R")
source("./R_Scripts/utils.R")
source("./R_Scripts/pipeline_steps.R")
source("./R_Scripts/pipeline_def.R")
source("./R_Scripts/by_tile_pipeline.R")

4.1.3.1 Fonctions principales du pipeline

Ces fonctions sont regroupées dans le script R R_Scripts/pipeline_steps.R

Fonction 1 : binarisation (étape 1)

Concrètement, la fonction réalise deux opérations essentielles :

  • Elle crée une image binaire où seuls les pixels identifiés comme peupleraies (classe 1) reçoivent la valeur 1, tous les autres pixels étant mis à 0. Le résultat est converti au format uint8 (entier 8 bits non signé) réduisant ainsi l’empreinte mémoire et accélérant les traitements ultérieurs.

  • Elle traite l’image de confiance en ne conservant que les valeurs correspondant aux pixels de peupleraies. Ces valeurs sont multipliées par 100 pour les exprimer en pourcentage (0 à 100%) et converties au format int16 (entier 16 bits) offrant un bon compromis entre précision des données et efficacité de traitement.

#' binarize_image
#' 
#' Binarize classification and confidence images.
#' Creates a binary classification (1 where class=1, 0 elsewhere) and
#' scales confidence values by 100 where classification is 1.
#' 
#' @param classif Classification data array
#' @param conf Confidence data array
#' @return Tuple of (binary_classification, scaled_confidence)
binarize_image <- function(classif, conf, params){
    # reclassifie les valeurs 1 = 1 , autres = 0
    # binary = (classif == 1).astype("uint8")
    # reclassification matrix
    m <- rbind(c(2, 0), c(3, 0),c(4, 0), c(4, 0),c(5, 0), c(6, 0),c(7, 0), c(8, 0))
    
    # Classify dataset
    binary = terra::classify(classif, m, include.lowest=TRUE)
    
    # compute masked confidence from binary mask
    confidence_masked = conf * 100 * binary
    
    # control input/output variation (less 1s in output than in input)
      if (DEBUG) {
              message("Values frequency (old classif):")
              print(terra::freq(classif, bylayer=FALSE) |> knitr::kable())
              message("Values frequency (new classif):")
              print(terra::freq(binary, bylayer=FALSE) |> knitr::kable())
              message("Values frequency (old conf):")
              print(terra::freq(conf, bylayer=FALSE) |> knitr::kable())
              message("Values frequency (new conf):")
              print(terra::freq(confidence_masked, bylayer=FALSE) |> knitr::kable())
            } 
    return(list(classif = binary, conf = confidence_masked))
}
Fonction 2 : filtrage à partir de données auxilliaires (étapes 2 et 3)

La fonction apply_threshold_from_aux permet d’affiner la classification en utilisant des données auxiliaires comme la pente ou la hauteur de la végétation.

Cette étape permet d’éliminer des pixels faux positifs détéctés à tort dans les peupleraies grâce à l’intégration de connaissances expertes.

Concrètement, la fonction réalise les opérations suivantes :

  • Elle charge les données auxiliaires (pente ou hauteur) seulement sur l’emprise géographique de la classification, en préservant la résolution spatiale et le géoréférencement.

  • Elle applique un seuil sur ces données auxiliaires pour créer un masque : - Pour la pente : elle élimine les pixels situés sur des pentes trop fortes (peu propices aux peupleraies) - Pour la hauteur : elle permet d’éliminer des pixels de végétation basse ayant une faible hauteur, peu susceptibles d’être des peupleraies

  • Elle met à jour la classification en supprimant les pixels qui ne respectent pas les critères définis par le seuil, en les remplaçant par la valeur 0 (non-peupleraie).

  • Elle masque également l’image de confiance en conséquence, en mettant à 0 les valeurs de confiance pour tous les pixels qui ont été exclus de la classification.

#' apply_threshold_from_aux
#' 
#' Apply threshold to classification based on auxiliary data.
#' @param classif Classification data array 
#' @param conf Confidence data array
#' @param params List of parameters to pass to the function (aux_path : path to auxiliary raster (slope or height), threshold: threshold value to apply)
#' @return list of (thresholded_classification, updated_confidence)
apply_threshold_from_aux <- function(classif, conf, params) {

    # Read auxiliary data (slope/height) for the region
    aux_win <- params$aux_path[[params$aux_index]]
    window(aux_win) <- terra::ext(classif)
    aux_array =  aux_win |> terra::resample(classif) # re-align rasters
    
    # Deep copy classif before modifying it
    new_classif = terra::deepcopy(classif)
    
    # create mask by reclassifying auxiliary array
    mask <- classify(x = aux_array,
                     params$reclass_mat,
                     include.lowest=TRUE)
    # test that mask and new_classif are on similar extent
    # print((ext(new_classif) + ext(mask)) / ext(mask))

  tryCatch(
        expr = {
            # Apply threshold to classification
            new_classif = new_classif * mask
            if (DEBUG) {
            message("Successfully applying threshold")
            }
        },
        error = function(e){
            message('Error when applying threshold')
            print(e)
        },
        warning = function(w){
            message('Caught a warning!')
            print(w)
        },
        finally = {
            # Update confidence from updated classif
            new_conf = conf * new_classif
            
            if (DEBUG) {
              message("Values frequency (old classif):")
               print(knitr::kable(terra::freq(classif, bylayer=FALSE)))
              message("Values frequency (new classif):")
               print(knitr::kable(terra::freq(new_classif, bylayer=FALSE)))
              message("Values frequency (old conf):")
              print(terra::freq(conf, bylayer=FALSE) |> knitr::kable())
              message("Values frequency (new conf):")
              print(terra::freq(new_conf, bylayer=FALSE) |> knitr::kable())
            }

            return(list(classif = new_classif, conf = new_conf))
        }
    ) 
      
}
Fonction 3 : filtrage médian

La fonction apply_median_filter applique un filtre médian sur les valeurs de confiance pour réduire le bruit et homogénéiser les résultats.

Concrètement, la fonction réalise les opérations suivantes :

  • Elle conserve l’image de classification d’origine tout en créant une copie pour les modifications ultérieures.

  • Elle applique un filtre médian sur l’image de confiance avec une fenêtre de taille paramétrable, ce qui permet de lisser les valeurs aberrantes tout en préservant les contours des objets.

  • Elle met à jour la classification en supprimant les pixels dont la confiance est devenue nulle après filtrage, assurant ainsi la cohérence entre les deux couches.

#' Applies a median filter to the confidence array and masks values where classification is 0.
#' @param classif Classification data array 
#' @param conf Confidence data array
#' @param params List of parameters to pass to the function (filter_size: Size of the median filter window)
#' @return list of (classification, filtered_confidence)
apply_median_filter <- function(classif, conf, params) {
  
    # Keep classification unchanged
    new_classif = terra::deepcopy(classif)
    
    # Apply median filter to confidence
    new_conf = terra::focal(terra::deepcopy(conf), # new copy of conf
                            w=params$filter_size, 
                            fun="median", # function to apply i.e. mean, sum, etc.
                            na.policy="omit", # "all": keep N, "omit" : remove NA 
                            fillvalue=NA) # which value fill edges outside raster

    # Mask confidence values where classification is 0
    new_conf = new_conf * new_classif
    
    # Update classification to match - set to 0 where confidence is 0
    reclass_mat <- matrix(c(0, 1, 0, # if 0 => 0
                            1, Inf, 1), # from 1 to Infiny => 1
                           ncol=3, byrow=TRUE)
                          
    mask <- classify(x = new_conf,
                     reclass_mat,
                     include.lowest=TRUE)
    new_classif = new_classif * mask
  

  if (DEBUG) {
    message(" In apply_median_filter()")
    message("Values frequency (old classif):")
    print(knitr::kable(terra::freq(classif, bylayer=FALSE)))
    message("Values frequency (new classif):")
    print(knitr::kable(terra::freq(new_classif, bylayer=FALSE)))
    message("Values frequency (old conf):")
    print(knitr::kable(terra::freq(conf, bylayer=FALSE)))
    message("Values frequency (new conf):")
    print(knitr::kable(terra::freq(new_conf, bylayer=FALSE)))
  }
  
  return(list(classif = classif, conf = conf))
}
Fonction 4 : application d’un seuil de confiance (étape 5)

La fonction apply_confidence_threshold permet d’éliminer les pixels classés comme peupleraies mais avec une faible confiance. Cette étape, appliquée après les filtres précédents, améliore la fiabilité globale de la classification avec un risque limité d’erreur. En effet, ce seuillage a été délibérément placé en fin de chaîne car certaines jeunes peupleraies peuvent initialement présenter des valeurs de confiance plus faibles, et les éliminer dès le début aurait pu conduire à une sous-détection de ces peuplements. Les étapes précédentes ayant déjà écarté de nombreux faux positifs, ce seuillage final peut être appliqué de manière plus sécurisée.

Concrètement, la fonction réalise les opérations suivantes :

  • Elle crée un masque en identifiant les pixels classés comme peupleraies (valeur 1) mais dont la confiance est inférieure au seuil spécifié.

  • Elle met à jour la classification en remplaçant ces pixels de faible confiance par la valeur 0 (non-peupleraie).

  • Elle masque également l’image de confiance en mettant à 0 les valeurs correspondant aux pixels exclus.

#' Apply confidence threshold to classification and confidence maps.
#' @param classif Classification data array 
#' @param conf Confidence data array
#' @param params List of parameters to pass to the function (threshold: Confidence threshold value)
#' @return list of (thresholded_classification, updated_confidence)
apply_confidence_threshold <- function(classif, conf, params) {
  
    # Create mask for low confidence pixels
    reclass_mat <- matrix(
      c(0, params$threshold, 0, # if value < threshold => 0
        params$threshold, Inf, 1), # if value >= threshold => 1
        ncol=3, byrow=TRUE)
                          
    mask <- classify(x = terra::deepcopy(conf),
                     reclass_mat,
                     include.lowest=TRUE)
    mask = mask * classif # set value to 0 where classif == 0
    message("Values frequency (mask):")
    print(knitr::kable(terra::freq(mask, bylayer=FALSE)))
    
    # Apply mask to classification and confidence
    new_classif = terra::deepcopy(classif) * mask
    new_conf = terra::deepcopy(conf) * mask
  
    if (DEBUG) {
    message(" Dans apply_confidence_threshold")
    message("Values frequency (old classif):")
    print(knitr::kable(terra::freq(classif, bylayer=FALSE)))
    message("Values frequency (new classif):")
    print(knitr::kable(terra::freq(new_classif, bylayer=FALSE)))
    message("Values frequency (old conf):")
    print(knitr::kable(terra::freq(conf, bylayer=FALSE)))
    message("Values frequency (new conf):")
    print(knitr::kable(terra::freq(new_conf, bylayer=FALSE)))
    }
  return(list(classif = new_classif, conf = new_conf))
}
Fonction 5 : filtrage des groupes de pixels isolés (étape 6)

La fonction apply_sieve_filter élimine les petits groupes de pixels isolés, permettant de nettoyer la classification des artefacts et d’obtenir des entités plus cohérentes avec la réalité terrain.

Concrètement, la fonction réalise les opérations suivantes :

  • Elle utilise l’algorithme de tamisage (sieve) de rasterio pour identifier les groupes connectés de pixels.

  • Elle supprime les groupes dont la taille est inférieure au seuil spécifié, en considérant soit une connectivité de 4 (pixels adjacents horizontalement et verticalement), soit une connectivité de 8 (incluant aussi les diagonales).

  • Elle met à jour l’image de confiance en conséquence, en masquant les valeurs pour les pixels qui ont été supprimés de la classification.

#' Apply sieve filter to remove small isolated patches.
#' Uses terra's sieve filter to remove connected components smaller than the specified threshold.
#' @param classif Classification data array 
#' @param conf Confidence data array
#' @param params List of parameters to pass to the function (size_threshold: Minimum size of connected components to keep, connectivity: Connectivity for connected components (4 or 8))
#' @return list of (filtered_classification, updated_confidence)
apply_sieve_filter <- function(classif, conf, params) {

    # Apply sieve filter
    new_classif = terra::sieve(
      x = terra::deepcopy(classif), # distinct copy of classif
      threshold = params$size_threshold, # set minimum pixel group size threshold
      directions = params$connectivity) # pixels connection : 4 for rook, 8 for queen
    
    # Update confidence
    new_conf = conf * new_classif
    
    if (DEBUG) {
    message(" Dans apply_sieve_filter")
    message("Values frequency (old classif):")
    print(knitr::kable(terra::freq(classif, bylayer=FALSE)))
    message("Values frequency (new classif):")
    print(knitr::kable(terra::freq(new_classif, bylayer=FALSE)))
    message("Values frequency (old conf):")
    print(knitr::kable(terra::freq(conf, bylayer=FALSE)))
    message("Values frequency (new conf):")
    print(knitr::kable(terra::freq(new_conf, bylayer=FALSE)))
      }
  
  return(list(classif = classif, conf = conf))
}

4.1.3.2 Lancement de la chaîne de post-traitement

Définition du pipeline

Ce fichier qui contient la définition sous forme de list des différentes fonction à lancer, avec les paramètres, est définis dans R_Scripts/pipeline_def.R

SLOPE_THRESHOLD <- 30 # degrees
HEIGHT_THRESHOLD <- 300 # centimeters
FILTER_SIZE <- 3
CONFIDENCE_THRESHOLD <- 30
SIEVE_SIZE_THRESHOLD <- 50
SIEVE_CONNECTIVITY <- 4
DEBUG <- TRUE

# Define the process chain as a list of lists
PROCESS_CHAIN <- list(
  # Step 1: Binarize image
  list(
    name = "01_binarize",
    fun = binarize_image,
    params = list()
  ),

  # Step 2: Apply slope threshold
  list(
    name = "02_slope_threshold",
    fun = apply_threshold_from_aux,
    params = list(
      aux_index = 1,
      threshold = SLOPE_THRESHOLD,
      reclass_mat = matrix(c(0, SLOPE_THRESHOLD, 1,
                             SLOPE_THRESHOLD, Inf, 0),
                           ncol=3, byrow=TRUE)
    )
  ),
    
  # Step 3: Apply height threshold
  list(
    name = "03_height_threshold",
    fun = apply_threshold_from_aux,
    params = list(
      aux_index = 2,
      threshold = HEIGHT_THRESHOLD,
      reclass_mat = matrix(c(0, HEIGHT_THRESHOLD, 0,
                             HEIGHT_THRESHOLD, Inf, 1),
                           ncol=3, byrow=TRUE)
    )
  ),
  
  # Step 4: Apply median filter
  list(
    name = "04_median_filter",
    fun = apply_median_filter,
    params = list(
      filter_size = FILTER_SIZE
    )
  ),
  
  # Step 5: Apply confidence threshold
  list(
    name = "05_confidence_threshold",
    fun = apply_confidence_threshold,
    params = list(
      threshold = CONFIDENCE_THRESHOLD
    )
  ),
  
  # Step 6: Apply sieve filter
  list(
    name = "06_sieve_filter",
    fun = apply_sieve_filter,
    params = list(
      size_threshold = SIEVE_SIZE_THRESHOLD,
      connectivity = SIEVE_CONNECTIVITY
    )
   )
)
Application du pipeline par tuile
#' Process a single tile through selected steps of the pipeline.
#' 
#' @param tile_path Path to the classification tile (str)
#' @param year_folder: Path to the year folder (str)
#' @param process_chain: List of processing steps (list)
#' @param slope_img: Global Slope image (SpatRaster)
#' @param height_img: Glocal Height image (SpatRaster)
#' @param start_step: Name of the first step to run (or None for first step) (str)
#' @param end_step: Name of the last step to run (or None for last step) (str)
#' @param visualize:  Whether to create visualizations (boolean)
#' @return Status message or error
process_tile_pipeline <- function(
    tiles_data, 
    year_folder, 
    process_chain,
    slope_img,
    height_img,
    start_step = NULL, 
    end_step = NULL,
    visualize = FALSE
) {

    s_aux_path <- slope_img
    h_aux_path <- height_img
    
    classif <- tiles_data[1]
    conf <- tiles_data[2]
    
    for (step in process_chain) {
    
      # Apply the step
      step$params <- append(step$params, list(aux_path = list(s_aux_path, h_aux_path)))
    
      returned_list  = step$fun(classif, conf, params = step$params)
      # replace inputs values with produced ones
      conf = returned_list$conf
      classif = returned_list$classif
    }
      message("***** END *****")
      print(paste0(names(classif)," / ", names(conf) ))
            
      return(sprc(classif,conf))
}

A la différence de Python et Dask, qui est capable de parallélisé les traitements avec finalement assez peu de modifications sur le code (dès lors qu’il est bien découpés en fonction), nous devons en R expliciter le graph Targets manuellement, en définissant les embranchements et en positionnant nos appels de fonctions dans des Targets.

C’est pourquoi la suite du fichier _targets.R , qui pilote de façon explicite l’execution de notre chaîne, est plutôt décrite dans la partie Parallélisation de ce TP.

4.2 Environnement Python

4.2.1 Présentation des bibliothèques utilisées

Python est utilisé pour sa richesse en bibliothèques dédiées aux formats de données raster, sa capacité à traiter de gros volumes de données (via Dask notamment), et sa bonne interopérabilité avec les fichiers notamment au formats GeoTIFF et NetCDF.

Les bibliothèques principales sont :

  • xarray : bibliothèque de référence pour manipuler des tableaux multidimensionnels labellisés (comme les cubes temporels ou multi-spectraux). Pour gérer les données géospatiales raster, xarray est souvent cominée à rioxarray, qui ajoute des fonctionnalités spatiales à xarray ce qui permet un traitement dans un format structuré et compatible avec des outils de calcul distribué (Dask).
  • rioxarray : extension de xarray pour gérer les attributs géospatiaux (CRS, extent, etc.) en lisant et écrivant des fichiers raster géoréférencés (GeoTIFF, etc.).
  • rasterio : bibliothèque de bas niveau pour lire/écrire des fichiers raster géospatiaux (basée sur GDAL). Elle est utilisée directement ou à travers rioxarray.
  • matplotlib : bibliothèque de visualisation utilisée pour afficher des rasters, des profils temporels ou des comparaisons.
  • dask : utilisé avec xarray pour paralléliser et optimiser les calculs sur des gros volumes de données raster.

Exemple :

import rioxarray
# Chargement d'une image de classification
raster = rioxarray.open_rasterio("classification.tif")
print(raster)

4.2.2 Exécution de la Chaîne

Pour exécuter la chaîne de post-traitement, il faut d’abord importer les bibliothèques nécessaires et définir quelques fonctions utilitaires.

4.2.2.1 Importation des bibliothèques

# Library imports
import os
import numpy as np
from scipy.ndimage import median_filter
import dask
from dask.delayed import delayed
from dask.distributed import Client
import dask.array as da
from time import time
import datetime
from typing import Dict, Any, Callable, List, Optional, Tuple
import rioxarray as rxr
import xarray as xr
import rasterio
from rasterio.features import sieve
import gc
import matplotlib.pyplot as plt

4.2.2.2 Définition des chemins et des paramètres par défaut

# Configuration paths
DATA_FOLDER = "./data_folder"
SLOPE_IMAGE = "./data_folder/auxiliary/slope_nouvelle_aquitaine.tif"
HEIGHT_IMAGE = "./data_folder/auxiliary/height_nouvelle_aquitaine.tif"
OUTPUT_DIR = os.path.expanduser("~/sageo_hpc_outputs")
os.makedirs(OUTPUT_DIR, exist_ok=True)

# Processing parameters
SLOPE_THRESHOLD = 30
HEIGHT_THRESHOLD = 300
FILTER_SIZE = 3
CONFIDENCE_THRESHOLD = 30
SIEVE_SIZE_THRESHOLD = 50
SIEVE_CONNECTIVITY = 4

# Dask configuration
NB_WORKERS = 2
NB_THREADS = 1
MEMORY_LIMIT = '8GB'

4.2.2.3 Fonctions utilitaires

Ces fonctions aident à la manipulation des données raster et sont utilisées par les fonctions principales du pipeline.

def find_years(data_folder: str) -> List[str]:
    """
    Find all year directories within the 'years' subdirectory of the data folder.
    
    Args:
        data_folder: Root data folder containing 'years' and 'auxiliary' directories
        
    Returns:
        List of year directory names
    """
    # Construct path to the years directory
    years_folder = os.path.join(data_folder, "years")
    
    # Find all subdirectories in the years folder
    return [d for d in os.listdir(years_folder) 
            if os.path.isdir(os.path.join(years_folder, d))]


def find_classification_tiles(folder: str) -> List[str]:
    """Find all classification tiles in a folder."""
    classification_folder = os.path.join(folder, "classification")
    if not os.path.exists(classification_folder):
        return []
    return [os.path.join(classification_folder, file) 
            for file in os.listdir(classification_folder) 
            if file.endswith(".tif")]
    
def get_window_from_bounds(bounds, transform, shape):
    """
    Calculate window from bounds, transform and shape.
    
    Args:
        bounds: Tuple of (minx, miny, maxx, maxy)
        transform: Affine transform of the raster
        shape: Shape of the raster (bands, height, width)
        
    Returns:
        rasterio.windows.Window object
    """
    row_start, col_start = ~transform * (bounds[0], bounds[3])
    row_stop, col_stop = ~transform * (bounds[2], bounds[1])
    
    row_stop = min(row_stop, shape[1])
    col_stop = min(col_stop, shape[2])
    
    height = max(row_stop - row_start, shape[1])
    width = max(col_stop - col_start, shape[2])
    
    return rasterio.windows.Window(col_start, row_start, width, height)

def read_auxiliary_data(aux_path, bounds, transform, shape):
    """
    Read auxiliary data (slope or height) for the given bounds.
    
    Args:
        aux_path: Path to auxiliary raster
        bounds: Tuple of (minx, miny, maxx, maxy)
        transform: Affine transform of the target raster
        shape: Shape of the target raster (bands, height, width)
        
    Returns:
        numpy.ndarray: Auxiliary data for the specified region
    """
    with rasterio.open(aux_path) as aux_ds:
        window = get_window_from_bounds(bounds, transform, shape)
        return aux_ds.read(1, window=window)

def determine_active_steps(
    process_chain: List[Dict], 
    start_step: Optional[str] = None, 
    end_step: Optional[str] = None
) -> Tuple[List[str], List[Dict], int, int]:
    """
    Determine which steps to run based on start and end step names.
    
    Args:
        process_chain: List of processing step dictionaries
        start_step: Name of the first step to run (or None for first step)
        end_step: Name of the last step to run (or None for last step)
        
    Returns:
        tuple: (all_step_names, active_steps, start_idx, end_idx)
    """
    all_step_names = [step["name"] for step in process_chain]
    
    start_idx = all_step_names.index(start_step) if start_step and start_step in all_step_names else 0
    end_idx = all_step_names.index(end_step) + 1 if end_step and end_step in all_step_names else len(all_step_names)
    
    return all_step_names, process_chain[start_idx:end_idx], start_idx, end_idx

def generate_report(results, year, output_dir=None):
    """
    Generate a summary report of processing results.
    
    Args:
        results: List of processing results
        year: Year being processed
        output_dir: Directory to save the report (default: current directory)
        
    Returns:
        str: Formatted report text
    """
    # Categorize results
    success = [r for r in results if not str(r).startswith("ERROR")]
    errors = [r for r in results if str(r).startswith("ERROR")]
    
    # Calculate statistics
    total_tiles = len(results)
    success_count = len(success)
    error_count = len(errors)
    success_rate = (success_count / total_tiles * 100) if total_tiles > 0 else 0
    
    # Format the report header with more details
    report = f"Processing Report for Year {year}\n"
    report += f"=" * 50 + "\n"
    report += f"Generated on: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n"
    
    # Add summary statistics
    report += f"Summary:\n"
    report += f"  Total tiles processed: {total_tiles}\n"
    report += f"  Successfully processed: {success_count} ({success_rate:.1f}%)\n"
    report += f"  Failed: {error_count}\n\n"
    
    # Add successful tiles section
    if success:
        report += f"Successfully Processed Tiles:\n"
        for i, tile in enumerate(success, 1):
            report += f"  {i}. {tile}\n"
        report += "\n"
    
    # Add error section with more details
    if errors:
        report += f"Errors:\n"
        for i, err in enumerate(errors, 1):
            # Extract more details from the error message if possible
            error_parts = str(err).split(":", 1)
            if len(error_parts) > 1:
                tile_name = error_parts[0].replace("ERROR", "").strip()
                error_msg = error_parts[1].strip()
                report += f"  {i}. Tile: {tile_name}\n     Error: {error_msg}\n"
            else:
                report += f"  {i}. {err}\n"
    
    # Determine the output path
    if output_dir is None:
        output_dir = OUTPUT_DIR
    
    report_dir = os.path.join(output_dir, "reports")
    os.makedirs(report_dir, exist_ok=True)
    report_path = os.path.join(report_dir, f"report_{year}_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.txt")
    
    # Write report to file
    with open(report_path, "w") as f:
        f.write(report)
    
    print(f"Report saved to: {report_path}")
    return report

4.2.2.4 Fonctions principales du pipeline

Fonction 1 : binarisation (étape 1)

La fonction binarize_image transforme les images de classification et de confiance en utilisant des DataArray de xarray. Cette structure de données conserve automatiquement les informations géographiques (coordonnées, système de projection) pendant les différentes opérations.

Concrètement, la fonction réalise deux opérations essentielles :

  • Elle crée une image binaire où seuls les pixels identifiés comme peupleraies (classe 1) reçoivent la valeur 1, tous les autres pixels étant mis à 0. Le résultat est converti au format uint8 (entier 8 bits non signé) réduisant ainsi l’empreinte mémoire et accélérant les traitements ultérieurs.

  • Elle traite l’image de confiance en ne conservant que les valeurs correspondant aux pixels de peupleraies. Ces valeurs sont multipliées par 100 pour les exprimer en pourcentage (0 à 100%) et converties au format int16 (entier 16 bits) offrant un bon compromis entre précision des données et efficacité de traitement.

def binarize_image(
    classif: xr.DataArray, 
    conf: xr.DataArray, 
    **kwargs
) -> Tuple[xr.DataArray, xr.DataArray]:
    """
    Binarize classification and confidence images.
    
    Creates a binary classification (1 where class=1, 0 elsewhere) and
    scales confidence values by 100 where classification is 1.
    
    Args:
        classif: Classification data array
        conf: Confidence data array
        
    Returns:
        Tuple of (binary_classification, scaled_confidence)
    """
    binary = (classif == 1).astype("uint8")
    confidence_masked = (conf * 100 * binary).astype("int16")
    
    return binary, confidence_masked
Fonction 2 : filtrage à partir de données auxilliaires (étapes 2 et 3)

La fonction apply_threshold_from_aux permet d’affiner la classification en utilisant des données auxiliaires comme la pente ou la hauteur de la végétation.

Cette étape permet d’éliminer des pixels faux positifs détéctés à tort dans les peupleraies grâce à l’intégration de connaissances expertes.

Concrètement, la fonction réalise les opérations suivantes :

  • Elle charge les données auxiliaires (pente ou hauteur) seulement sur l’emprise géographique de la classification, en préservant la résolution spatiale et le géoréférencement.

  • Elle applique un seuil sur ces données auxiliaires pour créer un masque : - Pour la pente : elle élimine les pixels situés sur des pentes trop fortes (peu propices aux peupleraies) - Pour la hauteur : elle permet d’éliminer des pixels de végétation basse ayant une faible hauteur, peu susceptibles d’être des peupleraies

  • Elle met à jour la classification en supprimant les pixels qui ne respectent pas les critères définis par le seuil, en les remplaçant par la valeur 0 (non-peupleraie).

  • Elle masque également l’image de confiance en conséquence, en mettant à 0 les valeurs de confiance pour tous les pixels qui ont été exclus de la classification.

def apply_threshold_from_aux(
    classif: xr.DataArray, 
    conf: xr.DataArray, 
    aux_path: str, 
    threshold: int, 
    filter_out_high_values: bool = True,  # If True, remove values >= threshold
    **kwargs
) -> Tuple[xr.DataArray, xr.DataArray]:
    """
    Apply threshold to classification based on auxiliary data.
    
    Reads auxiliary data (slope or height) and sets classification to 0
    where auxiliary values exceed the threshold.
    
    Args:
        classif: Classification data array
        conf: Confidence data array
        aux_path: Path to auxiliary raster (slope or height)
        threshold: Threshold value to apply
        filter_out_high_values: If True, remove pixels where aux 
        threshold; 
                                if False, remove pixels where aux < threshold
        
    Returns:
        Tuple of (thresholded_classification, updated_confidence)
    """
    # Read auxiliary data for the region
    aux_array = read_auxiliary_data(
        aux_path, 
        classif.rio.bounds(), 
        classif.rio.transform(), 
        classif.shape
    )
    
    # Apply threshold
    new_classif = classif.copy(deep=True)
    class_array = new_classif.values[0].copy()
    
    # Create the mask based on the threshold comparison
    if filter_out_high_values:
        mask = aux_array >= threshold  # Remove pixels where aux >= threshold
    else:
        mask = aux_array < threshold   # Remove pixels where aux < threshold
    
    # Apply the mask
    class_array[mask] = 0
    
    new_classif.values[0] = class_array
    
    # Update confidence
    new_conf = conf.copy(deep=True)
    new_conf = new_conf.where(new_classif > 0, 0)
    
    return new_classif, new_conf
Fonction 3 : filtrage médian

La fonction apply_median_filter applique un filtre médian sur les valeurs de confiance pour réduire le bruit et homogénéiser les résultats.

Concrètement, la fonction réalise les opérations suivantes :

  • Elle conserve l’image de classification d’origine tout en créant une copie pour les modifications ultérieures.

  • Elle applique un filtre médian sur l’image de confiance avec une fenêtre de taille paramétrable, ce qui permet de lisser les valeurs aberrantes tout en préservant les contours des objets.

  • Elle met à jour la classification en supprimant les pixels dont la confiance est devenue nulle après filtrage, assurant ainsi la cohérence entre les deux couches.

def apply_median_filter(
    classif: xr.DataArray, 
    conf: xr.DataArray, 
    filter_size: int, 
    **kwargs
) -> Tuple[xr.DataArray, xr.DataArray]:
    """
    Apply median filter to confidence values.
    
    Applies a median filter to the confidence array and masks values
    where classification is 0.
    
    Args:
        classif: Classification data array
        conf: Confidence data array
        filter_size: Size of the median filter window
        
    Returns:
        Tuple of (classification, filtered_confidence)
    """
    # Keep classification unchanged
    new_classif = classif.copy(deep=True)
    
    # Apply median filter to confidence
    conf_array = conf.values[0].copy()
    filtered_array = median_filter(conf_array, size=filter_size)
    
    # Create new confidence array
    new_conf = conf.copy(deep=True)
    new_conf.values[0] = filtered_array
    
    # Mask confidence values where classification is 0
    new_conf = new_conf.where(new_classif > 0, 0)
    
    # Update classification to match - set to 0 where confidence is 0
    new_classif_array = new_classif.values[0]
    new_classif_array[new_conf.values[0] == 0] = 0
    new_classif.values[0] = new_classif_array
    
    return new_classif, new_conf
Fonction 4 : application d’un seuil de confiance (étape 5)

La fonction apply_confidence_threshold permet d’éliminer les pixels classés comme peupleraies mais avec une faible confiance. Cette étape, appliquée après les filtres précédents, améliore la fiabilité globale de la classification avec un risque limité d’erreur. En effet, ce seuillage a été délibérément placé en fin de chaîne car certaines jeunes peupleraies peuvent initialement présenter des valeurs de confiance plus faibles, et les éliminer dès le début aurait pu conduire à une sous-détection de ces peuplements. Les étapes précédentes ayant déjà écarté de nombreux faux positifs, ce seuillage final peut être appliqué de manière plus sécurisée.

Concrètement, la fonction réalise les opérations suivantes :

  • Elle crée un masque en identifiant les pixels classés comme peupleraies (valeur 1) mais dont la confiance est inférieure au seuil spécifié.

  • Elle met à jour la classification en remplaçant ces pixels de faible confiance par la valeur 0 (non-peupleraie).

  • Elle masque également l’image de confiance en mettant à 0 les valeurs correspondant aux pixels exclus.

def apply_confidence_threshold(
    classif: xr.DataArray, 
    conf: xr.DataArray, 
    threshold: int, 
    **kwargs
) -> Tuple[xr.DataArray, xr.DataArray]:
    """
    Apply confidence threshold to classification and confidence maps.
    
    Sets classification to 0 where confidence is below the threshold.
    
    Args:
        classif: Classification data array
        conf: Confidence data array
        threshold: Confidence threshold value
        
    Returns:
        Tuple of (thresholded_classification, thresholded_confidence)
    """
    # Create mask for low confidence pixels
    mask = (conf < threshold) & (classif == 1)
    
    # Apply mask to classification and confidence
    new_classif = classif.where(~mask, 0)
    new_conf = conf.where(~mask, 0)
    
    return new_classif, new_conf
Fonction 5 : filtrage des groupes de pixels isolés (étape 6)

La fonction apply_sieve_filter élimine les petits groupes de pixels isolés, permettant de nettoyer la classification des artefacts et d’obtenir des entités plus cohérentes avec la réalité terrain.

Concrètement, la fonction réalise les opérations suivantes :

  • Elle utilise l’algorithme de tamisage (sieve) de rasterio pour identifier les groupes connectés de pixels.

  • Elle supprime les groupes dont la taille est inférieure au seuil spécifié, en considérant soit une connectivité de 4 (pixels adjacents horizontalement et verticalement), soit une connectivité de 8 (incluant aussi les diagonales).

  • Elle met à jour l’image de confiance en conséquence, en masquant les valeurs pour les pixels qui ont été supprimés de la classification.

def apply_sieve_filter(
    classif: xr.DataArray, 
    conf: xr.DataArray, 
    size_threshold: int, 
    connectivity: int, 
    **kwargs
) -> Tuple[xr.DataArray, xr.DataArray]:
    """
    Apply sieve filter to remove small isolated patches.
    
    Uses rasterio's sieve filter to remove connected components
    smaller than the specified threshold.
    
    Args:
        classif: Classification data array
        conf: Confidence data array
        size_threshold: Minimum size of connected components to keep
        connectivity: Connectivity for connected components (4 or 8)
        
    Returns:
        Tuple of (filtered_classification, updated_confidence)
    """
    # Get classification as numpy array
    class_array = classif.values[0].copy()
    
    # Apply sieve filter
    filtered_array = sieve(class_array, size_threshold, connectivity=connectivity)
    
    # Create new classification array
    new_classif = classif.copy(deep=True)
    new_classif.values[0] = filtered_array
    
    # Update confidence
    new_conf = conf.where(new_classif > 0, 0)
    
    return new_classif, new_conf

4.2.2.5 Fonction de visualisation

def visualize_processing_steps(original_classif, step_results, tile_name):
    """
    Visualize processing steps for a tile.
    
    Args:
        original_classif: Original classification data array
        step_results: List of dictionaries with intermediate results
        tile_name: Name of the tile for display
        
    Returns:
        matplotlib.figure.Figure: Figure with visualizations
    """
    # Create a figure with one row per step
    n_steps = len(step_results)
    n_cols = 3 
    n_rows = (n_steps + 1 + n_cols - 1) // n_cols 
    fig, axes = plt.subplots(n_rows, n_cols, figsize=(n_cols * 4, n_rows * 3.5))
    axes = axes.flatten()

  
    
    # Subsample for display
    max_size = 1000
    if original_classif.shape[1] > max_size or original_classif.shape[2] > max_size:
        scale_factor = max(original_classif.shape[1], original_classif.shape[2]) // max_size + 1
        display_data = original_classif.values[0][::scale_factor, ::scale_factor]
    else:
        display_data = original_classif.values[0]
    
    # Display original
    axes[0].imshow(display_data, cmap='binary')
    axes[0].set_title("Original")
    axes[0].set_xticks([])
    axes[0].set_yticks([])
    
    # Calculate initial percentage
    poplar_pixels = np.sum(original_classif.values[0] == 1)
    total_pixels = original_classif.values[0].size
    percent = (poplar_pixels / total_pixels) * 100
    axes[0].set_xlabel(f"Total pixels: {total_pixels}")
    
    # Variable to store previous pixel count
    prev_poplar_pixels = poplar_pixels
    
    # Display each step
    for i, step in enumerate(step_results):
        # Subsample for display
        if step["classif"].shape[1] > max_size or step["classif"].shape[2] > max_size:
            scale_factor = max(step["classif"].shape[1], step["classif"].shape[2]) // max_size + 1
            display_data = step["classif"].values[0][::scale_factor, ::scale_factor]
        else:
            display_data = step["classif"].values[0]
        
        # Display image
        axes[i+1].imshow(display_data, cmap='binary')
        axes[i+1].set_title(step["name"].split("_")[1] if "_" in step["name"] else step["name"])
        axes[i+1].set_xticks([])
        axes[i+1].set_yticks([])
        
        # Calculate percentage and reduction
        poplar_pixels = np.sum(step["classif"].values[0] > 0)
        total_pixels = step["classif"].values[0].size
        percent = (poplar_pixels / total_pixels) * 100
        
        if prev_poplar_pixels > 0:
            reduction = 100 - (poplar_pixels / prev_poplar_pixels * 100)
            axes[i+1].set_xlabel(f"Poplar pixels: {poplar_pixels} (-{reduction:.1f}%)")
        else:
            axes[i+1].set_xlabel(f"Poplar pixels: {poplar_pixels}")
        
        prev_poplar_pixels = poplar_pixels
        
    # Mask unused axes
    for j in range(7, len(axes)):
        axes[j].axis('off')
    
    # Add global title
    plt.suptitle(f"Post-processing results for tile {tile_name}", fontsize=14)
    plt.tight_layout(rect=[0, 0, 1, 0.95])
    
    return fig

4.2.2.6 Lancement de la chaîne de post-traitement

Définition du pipeline
# Define the processing chain
PROCESS_CHAIN = [
    {
        "name": "01_binarize", 
        "function": binarize_image,
        "params": {}
    },
    {
        "name": "02_slope_threshold", 
        "function": apply_threshold_from_aux, 
        "params": {
            "aux_path": SLOPE_IMAGE,
            "threshold": SLOPE_THRESHOLD,
            "filter_out_high_values": True
        }
    },
    {
        "name": "03_height_threshold", 
        "function": apply_threshold_from_aux, 
        "params": {
            "aux_path": HEIGHT_IMAGE,
            "threshold": HEIGHT_THRESHOLD,
            "filter_out_high_values": False 
        }
    },
    {
        "name": "04_median_filter", 
        "function": apply_median_filter, 
        "params": {
            "filter_size": FILTER_SIZE
        }
    },
    {
        "name": "05_confidence_threshold",
        "function": apply_confidence_threshold,
        "params": {
            "threshold": CONFIDENCE_THRESHOLD
        }
    },
    {
        "name": "06_sieve_filter",
        "function": apply_sieve_filter,
        "params": {
            "size_threshold": SIEVE_SIZE_THRESHOLD,
            "connectivity": SIEVE_CONNECTIVITY
        }
    }
]
Application du pipeline par tuile
def process_tile_pipeline(
    tile_path: str, 
    year_folder: str, 
    process_chain: List[Dict], 
    start_step: Optional[str] = None, 
    end_step: Optional[str] = None,
    visualize: bool = False,
    output_dir: Optional[str] = None

) -> str:
    """
    Process a single tile through selected steps of the pipeline.
    
    Args:
        tile_path: Path to the classification tile
        year_folder: Path to the year folder
        process_chain: List of processing steps
        start_step: Name of the first step to run (or None for first step)
        end_step: Name of the last step to run (or None for last step)
        visualize: Whether to create visualizations
        
    Returns:
        str: Status message or error
    """
    
    # Use OUTPUT_DIR if no custom output directory is provided
    if output_dir is None:
        output_dir = OUTPUT_DIR

    # Prepare input paths
    conf_path = tile_path.replace("classification", "confiance")
    
    # Get the tile name from the classification file name
    tile_name = os.path.basename(tile_path).split("_")[1]
    
    print(f"Starting pipeline for tile: {tile_name}")
    
    # Determine which steps to run
    _, active_steps, _, _ = determine_active_steps(process_chain, start_step, end_step)

    # Load data
    classif = rxr.open_rasterio(tile_path)
    conf = rxr.open_rasterio(conf_path)
    
    # Save original classification for visualization
    classif_original = classif.copy() if visualize else None
    
    # Store intermediate results for visualization if needed
    step_results = [] if visualize else None

    
    # Process each step
    for step in active_steps:
        print(f"  Processing {step['name']} for {tile_name}")
        # Apply the step
        classif, conf = step["function"](classif, conf, **(step["params"] or {}))
        
        # Store intermediate result if visualizing
        if visualize:
            step_results.append({
                "name": step["name"],
                "classif": classif.copy(),
                "conf": conf.copy()
            })
            
        gc.collect()

    # Write results

    # Create output folders
    # Create output folders
    year = os.path.basename(year_folder)
    year_output_dir = os.path.join(output_dir, year)

    # Create year output directory
    os.makedirs(year_output_dir, exist_ok=True)
            
    output_classif_folder = os.path.join(year_output_dir, "classification")
    output_conf_folder = os.path.join(year_output_dir, "confiance")
    os.makedirs(output_classif_folder, exist_ok=True)
    os.makedirs(output_conf_folder, exist_ok=True)

    # Write files
    classif.rio.to_raster(os.path.join(output_classif_folder, os.path.basename(tile_path)), 
                            compress="lzw")
    conf.rio.to_raster(os.path.join(output_conf_folder, os.path.basename(conf_path)), 
                        compress="lzw")
    
    # Create visualization if requested
    if visualize:
        # Create visualization folder
        vis_folder = os.path.join(year_output_dir, "visualization")
        os.makedirs(vis_folder, exist_ok=True)

        # Generate visualization
        fig = visualize_processing_steps(
            classif_original, 
            step_results, 
            tile_name
        )
        
        # Save image
        fig.savefig(os.path.join(vis_folder, f"{tile_name}_steps.png"), 
                    dpi=200, 
                    bbox_inches='tight')
        plt.close(fig)
            
        # Clean up
        classif, conf = None, None
        gc.collect()
    
    print(f"Completed pipeline for tile: {tile_name}")
    return tile_name
Application du pipeline pour toutes les tuiles et les années avec map_blocks
def run_pipeline_years_map_blocks(
    data_folder: str,
    process_chain: List[Dict],
    start_step: Optional[str] = None,
    end_step: Optional[str] = None,
    n_workers: int = NB_WORKERS,
    n_threads: int = NB_THREADS,
    visualize: bool = True,
    memory_limit: str = MEMORY_LIMIT,
    output_dir: Optional[str] = None
):
    """
    Run the processing pipeline for all years and tiles using Dask's map_blocks approach.
    """

    # Use OUTPUT_DIR if no custom output directory is provided
    if output_dir is None:
        output_dir = OUTPUT_DIR

    # Ensure output directory exists
    os.makedirs(output_dir, exist_ok=True)

    total_start = time()
    
    print(f"Starting pipeline with {n_workers} workers and {n_threads} threads per worker (map_blocks implementation)")
    print(f"Processing steps: {start_step or 'beginning'} to {end_step or 'end'}")
    
    # Create Dask client with specified configuration
    client = Client(
        processes=True,
        n_workers=n_workers,
        threads_per_worker=n_threads,
        memory_limit=memory_limit
    )
    
    # Find all years
    years = find_years(data_folder)
    print(f"Found {len(years)} years to process")
    
    # Process each year
    for year in years:
        year_folder = os.path.join(data_folder, "years", year)
        year_output_dir = os.path.join(output_dir, year)

        # Create year output directory
        os.makedirs(year_output_dir, exist_ok=True)

        # Find input tiles
        input_folder = os.path.join(year_folder, "inputs")
        tile_paths = find_classification_tiles(input_folder)
        print(f"Processing {len(tile_paths)} tiles for year {year}")
        
        # Define a function to process tiles by index
        def process_tile_by_index(block):
            results = []
            for idx in block:
                idx = int(idx)
                if idx < len(tile_paths):
                    result = process_tile_pipeline(
                        tile_path=tile_paths[idx],
                        year_folder=year_folder,
                        process_chain=process_chain,
                        start_step=start_step,
                        end_step=end_step,
                        visualize=visualize,
                        output_dir=output_dir 
                    )
                    results.append(result)
                else:
                    results.append(None)
            return np.array(results, dtype=object)
        
        # Create a dummy array with indices for each tile
        dummy_array = da.arange(len(tile_paths), chunks=1)
        
        # Apply map_blocks to process all tiles
        results_array = dummy_array.map_blocks(
            process_tile_by_index,
            dtype=object
        )
        
        # Compute results
        year_results = results_array.compute()
        
        # Generate report for the year
        generate_report(year_results.tolist(), year, output_dir=year_output_dir)
        print(f"Year {year} completed: {len(year_results)} tiles processed")
    
    print(f"Total processing time: {time() - total_start:.2f} seconds")

    # Ensure client is closed even if an error occurs
    client.close()
    print("Dask client closed")
Bloc principal d’exécution
if __name__ == '__main__':
    run_pipeline_years_map_blocks(
        data_folder=DATA_FOLDER,
        process_chain=PROCESS_CHAIN,
        n_workers=NB_WORKERS,
        n_threads=NB_THREADS,
        visualize=True,
    )

4.2.3 Map_blocks versus Futures

La parallélisation du traitement des tuiles raster peut être réalisée de différentes manières avec Dask.

Notre implémentation utilise map_blocks, une approche de haut niveau qui applique une fonction à des blocs de données indépendants. Dans le code utilisé, chaque bloc contient un indice unique (chunks=1) qui pointe vers une tuile raster spécifique à traiter :

dummy_array = da.arange(len(tile_paths), chunks=1)

Le paramètre chunks définit la taille des blocs traités en parallèle. Par exemple, avec chunks=2, chaque bloc contiendrait 2 indices consécutifs, ce qui réduirait le nombre de tâches parallèles. Des valeurs plus élevées peuvent être efficaces pour de petites tuiles mais risquent de compromettre l’optimisation de la distribution des ressources entre les workers.

Ici, chque bloc est indépendant car : - Le traitement d’une tuile ne dépend pas du résultat du traitement d’une autre tuile - Chaque tuile peut être chargée, traitée et sauvegardée séparément - Il n’y a pas de communication nécessaire entre les traitements des différentes tuiles

Dask exploite cette indépendance pour paralléliser les traitements :

  • Distribution : chaque bloc peut être envoyé à n’importe quel worker disponible
  • Parallélisation : plusieurs blocs peuvent être traités simultanément sur différents cœurs ou machines
  • Tolérance aux pannes : si le traitement d’un bloc échoue, seul ce bloc doit être recalculé

Contrairement aux futures explicites créées avec dask.delayed(), map_blocks abstrait la gestion des tâches individuelles en traitant automatiquement la distribution du travail. Dans notre pipeline, nous créons un tableau d’indices (dummy_array = da.arange(len(tile_paths), chunks=1)) où chaque indice correspond à une tuile à traiter. La fonction map_blocks applique ensuite process_tile_by_index à chaque bloc (contenant un seul indice), permettant ainsi de traiter chaque tuile indépendamment et en parallèle.

Une approche alternative utilisant des futures explicites. Une future dans Dask est essentiellement une promesse de résultat : un objet qui représente un calcul qui sera exécuté ultérieurement. Une implémentation avec des futures ressemblerait à ceci :

def run_pipeline_years(
    data_folder: str,
    process_chain: List[Dict],
    start_step: Optional[str] = None,
    end_step: Optional[str] = None,
    n_workers: int = NB_WORKERS,
    n_threads: int = NB_THREADS,
    visualize: bool = True,
    memory_limit: str = MEMORY_LIMIT,
):
    """
    Run the processing pipeline for all years and tiles.
    """
    total_start = time()
    
    print(f"Starting pipeline with {n_workers} workers and {n_threads} threads per worker")
    print(f"Processing steps: {start_step or 'beginning'} to {end_step or 'end'}")
    
    # Create Dask client with specified configuration
    client = Client(
        processes=True,
        n_workers=n_workers,
        threads_per_worker=n_threads,
        memory_limit=memory_limit
    )
    

    # Find all years
    years = find_years(data_folder)
    print(f"Found {len(years)} years to process")
    
    # Create a list to store all futures
    all_futures = []
    
    # Submit tasks for all tiles across all years
    for year in years:
        year_folder = os.path.join(data_folder, "years", year)
        
        # Find input tiles
        input_folder = os.path.join(year_folder, "inputs")
        tile_paths = find_classification_tiles(input_folder)
        print(f"Submitting {len(tile_paths)} tiles for year {year}")
        
        # Submit each tile as a separate task
        for tile_path in tile_paths:
            future = client.submit(
                process_tile_pipeline,
                tile_path=tile_path,
                year_folder=year_folder,
                process_chain=process_chain,
                start_step=start_step,
                end_step=end_step,
                visualize=visualize
            )
            # Store the future with its year for later reporting
            all_futures.append((year, future))
    
    # Process results as they complete
    year_results = {}
    for year, future in all_futures:
        result = future.result()  # This will wait for the task to complete
        
        # Initialize year results if needed
        if year not in year_results:
            year_results[year] = []
            
        # Add result to the appropriate year
        year_results[year].append(result)
        
        # Generate report when all tiles for a year are done
        if len(year_results[year]) == len([f for y, f in all_futures if y == year]):
            report_dir = os.path.join(data_folder, "years", year, "outputs", "reports")
            generate_report(year_results[year], year, output_dir=report_dir)
            print(f"Year {year} completed: {len(year_results[year])} tiles processed")
    
    print(f"Total processing time: {time() - total_start:.2f} seconds")
        

    # Ensure client is closed even if an error occurs
    client.close()
    print("Dask client closed")


if __name__ == '__main__':
    # Run the pipeline with all steps
    run_pipeline_years(
        data_folder=DATA_FOLDER,
        process_chain=PROCESS_CHAIN,
        n_workers=NB_WORKERS,
        n_threads=NB_THREADS,
        visualize=True,
    )

Cette approche avec les futures explicites fonctionne différemment de map_blocks. Voici comment elle opère en détail:

  • Chaque tuile est soumise individuellement au planificateur Dask via client.submit() ou dask.delayed() qui enregistre la tâche sans l’exécuter immédiatement
  • L’exécution est déclenchée soit par future.result() pour une future individuelle soit par dask.compute(*futures) pour calculer plusieurs futures en parallèle. Les workers disponibles traitent alors ces tâches simultanément et les résultats sont récupérés une fois les calculs terminés.

Cette approche avec futures offre plus de contrôle sur chaque tâche individuelle et permet de construire des graphes de dépendances plus complexes, mais nécessite une gestion plus explicite du parallélisme. Dans notre pipeline, nous avons privilégié map_blocks pour sa concision et son intégration naturelle avec les tableaux Dask, permettant ainsi de traiter chaque tuile indépendamment et en parallèle tout en bénéficiant des optimisations internes de Dask.