4 Chaine de traitement
Pour ce TP les données initiales sont accessibles depuis un espace de stockage partagé associé au projet sur le Cluster. Le chemin de data\_folder est /home/2025021/PARTAGE/data_folder. Pensez bien à changer ce Path dans les scripts et n’hésitez pas à vous rendre dans les dossiers en utilisant le terminal disponible sous Jupyter.
4.1 Environnement R
4.1.1 Présentation des bibliothèques utilisées
Les librairies principales sont les suivantes :
-
terra: bibliothèque pour gérer des rasters -
targets: permet de construire des workflow qui peuvent ensuite être distribué sur des environnements distribués grâce à … -
crew: qui offre les interfaces de connexion nécessaire pour différents environnements, local évidemment mais aussi distant HPC comme SLURM, SGE, etc. -
geotargets: une librairie qui étend les capacités de Targets et le rend plus capable de gérer les fichiers spatiaux -
tarchetypes: une librairie qui permet d’écrire des workflows plus lisible que la syntaxe Targets
4.1.2 Exécution de la Chaîne
Targets nécessite une organisation précise pour fonctionner. Le workflow doit obligatoirement être décrit dans un fichier _targets.R pour que la fonction de contruction tar_make fonctionne. C’est le point d’entrée pour l’execution du workflow.
Pour le moment nous créons ce fichier sans le remplir, nous y reviendrons ensuite.
4.1.2.1 Importation des bibliothèques
Nous avons fait le choix de lister les packages dans un fichier packages.R, qui sont ensuite chargés depuis _targers.R
library(terra)
library(targets)
library(geotargets)
library(fs)
library(crew)
library(tarchetypes)
library(visNetwork)
library(crew.cluster)
library(autometric)Vous pouvez ajouter la ligne source("./packages.R") dans _targets.R, au tout début.
4.1.2.2 Définition des fonctions utilitaires
Nous regroupons ensuite toutes nos fonctions utilitaires dans un répertoire R_Script, qui lui aussi sera sourcé depuis le début du fichier _targets.R
Celles-ci sont décrite dans la partie parallélisation en même temps que le Workflow Targets, elles sont regroupés dans un fichier R_Scripts/utils.R.
-
find_years(data_folder): renvoie une liste des années dans le dossierdata_folder -
find_classification_tiles: construit une liste des chemins vers les rasters classifications et confiance. -
load_rastercharge et renvoie les deux fichiers raster classification et confiance avec la fonctionrast()de Terra.
4.1.3 Exécution de la Chaîne
L’execution de la chaîne se fait au travers du fichier _targets.R, c’est pourquoi tous les scripts à suivre sont regroupés dans un dossier R_Scripts :
- utils.R
- by_tile_pipeline.R
- pipeline_def.R
- pipeline_steps.R
Ces scripts sont sourcés dans _targets.R via les lignes :
source("./packages.R")
source("./R_Scripts/utils.R")
source("./R_Scripts/pipeline_steps.R")
source("./R_Scripts/pipeline_def.R")
source("./R_Scripts/by_tile_pipeline.R")4.1.3.1 Fonctions principales du pipeline
Ces fonctions sont regroupées dans le script R R_Scripts/pipeline_steps.R
Fonction 1 : binarisation (étape 1)
Concrètement, la fonction réalise deux opérations essentielles :
Elle crée une image binaire où seuls les pixels identifiés comme peupleraies (classe 1) reçoivent la valeur 1, tous les autres pixels étant mis à 0. Le résultat est converti au format
uint8(entier 8 bits non signé) réduisant ainsi l’empreinte mémoire et accélérant les traitements ultérieurs.Elle traite l’image de confiance en ne conservant que les valeurs correspondant aux pixels de peupleraies. Ces valeurs sont multipliées par 100 pour les exprimer en pourcentage (0 à 100%) et converties au format
int16(entier 16 bits) offrant un bon compromis entre précision des données et efficacité de traitement.
#' binarize_image
#'
#' Binarize classification and confidence images.
#' Creates a binary classification (1 where class=1, 0 elsewhere) and
#' scales confidence values by 100 where classification is 1.
#'
#' @param classif Classification data array
#' @param conf Confidence data array
#' @return Tuple of (binary_classification, scaled_confidence)
binarize_image <- function(classif, conf, params){
# reclassifie les valeurs 1 = 1 , autres = 0
# binary = (classif == 1).astype("uint8")
# reclassification matrix
m <- rbind(c(2, 0), c(3, 0),c(4, 0), c(4, 0),c(5, 0), c(6, 0),c(7, 0), c(8, 0))
# Classify dataset
binary = terra::classify(classif, m, include.lowest=TRUE)
# compute masked confidence from binary mask
confidence_masked = conf * 100 * binary
# control input/output variation (less 1s in output than in input)
if (DEBUG) {
message("Values frequency (old classif):")
print(terra::freq(classif, bylayer=FALSE) |> knitr::kable())
message("Values frequency (new classif):")
print(terra::freq(binary, bylayer=FALSE) |> knitr::kable())
message("Values frequency (old conf):")
print(terra::freq(conf, bylayer=FALSE) |> knitr::kable())
message("Values frequency (new conf):")
print(terra::freq(confidence_masked, bylayer=FALSE) |> knitr::kable())
}
return(list(classif = binary, conf = confidence_masked))
}Fonction 2 : filtrage à partir de données auxilliaires (étapes 2 et 3)
La fonction apply_threshold_from_aux permet d’affiner la classification en utilisant des données auxiliaires comme la pente ou la hauteur de la végétation.
Cette étape permet d’éliminer des pixels faux positifs détéctés à tort dans les peupleraies grâce à l’intégration de connaissances expertes.
Concrètement, la fonction réalise les opérations suivantes :
Elle charge les données auxiliaires (pente ou hauteur) seulement sur l’emprise géographique de la classification, en préservant la résolution spatiale et le géoréférencement.
Elle applique un seuil sur ces données auxiliaires pour créer un masque : - Pour la pente : elle élimine les pixels situés sur des pentes trop fortes (peu propices aux peupleraies) - Pour la hauteur : elle permet d’éliminer des pixels de végétation basse ayant une faible hauteur, peu susceptibles d’être des peupleraies
Elle met à jour la classification en supprimant les pixels qui ne respectent pas les critères définis par le seuil, en les remplaçant par la valeur 0 (non-peupleraie).
Elle masque également l’image de confiance en conséquence, en mettant à 0 les valeurs de confiance pour tous les pixels qui ont été exclus de la classification.
#' apply_threshold_from_aux
#'
#' Apply threshold to classification based on auxiliary data.
#' @param classif Classification data array
#' @param conf Confidence data array
#' @param params List of parameters to pass to the function (aux_path : path to auxiliary raster (slope or height), threshold: threshold value to apply)
#' @return list of (thresholded_classification, updated_confidence)
apply_threshold_from_aux <- function(classif, conf, params) {
# Read auxiliary data (slope/height) for the region
aux_win <- params$aux_path[[params$aux_index]]
window(aux_win) <- terra::ext(classif)
aux_array = aux_win |> terra::resample(classif) # re-align rasters
# Deep copy classif before modifying it
new_classif = terra::deepcopy(classif)
# create mask by reclassifying auxiliary array
mask <- classify(x = aux_array,
params$reclass_mat,
include.lowest=TRUE)
# test that mask and new_classif are on similar extent
# print((ext(new_classif) + ext(mask)) / ext(mask))
tryCatch(
expr = {
# Apply threshold to classification
new_classif = new_classif * mask
if (DEBUG) {
message("Successfully applying threshold")
}
},
error = function(e){
message('Error when applying threshold')
print(e)
},
warning = function(w){
message('Caught a warning!')
print(w)
},
finally = {
# Update confidence from updated classif
new_conf = conf * new_classif
if (DEBUG) {
message("Values frequency (old classif):")
print(knitr::kable(terra::freq(classif, bylayer=FALSE)))
message("Values frequency (new classif):")
print(knitr::kable(terra::freq(new_classif, bylayer=FALSE)))
message("Values frequency (old conf):")
print(terra::freq(conf, bylayer=FALSE) |> knitr::kable())
message("Values frequency (new conf):")
print(terra::freq(new_conf, bylayer=FALSE) |> knitr::kable())
}
return(list(classif = new_classif, conf = new_conf))
}
)
}Fonction 3 : filtrage médian
La fonction apply_median_filter applique un filtre médian sur les valeurs de confiance pour réduire le bruit et homogénéiser les résultats.
Concrètement, la fonction réalise les opérations suivantes :
Elle conserve l’image de classification d’origine tout en créant une copie pour les modifications ultérieures.
Elle applique un filtre médian sur l’image de confiance avec une fenêtre de taille paramétrable, ce qui permet de lisser les valeurs aberrantes tout en préservant les contours des objets.
Elle met à jour la classification en supprimant les pixels dont la confiance est devenue nulle après filtrage, assurant ainsi la cohérence entre les deux couches.
#' Applies a median filter to the confidence array and masks values where classification is 0.
#' @param classif Classification data array
#' @param conf Confidence data array
#' @param params List of parameters to pass to the function (filter_size: Size of the median filter window)
#' @return list of (classification, filtered_confidence)
apply_median_filter <- function(classif, conf, params) {
# Keep classification unchanged
new_classif = terra::deepcopy(classif)
# Apply median filter to confidence
new_conf = terra::focal(terra::deepcopy(conf), # new copy of conf
w=params$filter_size,
fun="median", # function to apply i.e. mean, sum, etc.
na.policy="omit", # "all": keep N, "omit" : remove NA
fillvalue=NA) # which value fill edges outside raster
# Mask confidence values where classification is 0
new_conf = new_conf * new_classif
# Update classification to match - set to 0 where confidence is 0
reclass_mat <- matrix(c(0, 1, 0, # if 0 => 0
1, Inf, 1), # from 1 to Infiny => 1
ncol=3, byrow=TRUE)
mask <- classify(x = new_conf,
reclass_mat,
include.lowest=TRUE)
new_classif = new_classif * mask
if (DEBUG) {
message(" In apply_median_filter()")
message("Values frequency (old classif):")
print(knitr::kable(terra::freq(classif, bylayer=FALSE)))
message("Values frequency (new classif):")
print(knitr::kable(terra::freq(new_classif, bylayer=FALSE)))
message("Values frequency (old conf):")
print(knitr::kable(terra::freq(conf, bylayer=FALSE)))
message("Values frequency (new conf):")
print(knitr::kable(terra::freq(new_conf, bylayer=FALSE)))
}
return(list(classif = classif, conf = conf))
}Fonction 4 : application d’un seuil de confiance (étape 5)
La fonction apply_confidence_threshold permet d’éliminer les pixels classés comme peupleraies mais avec une faible confiance.
Cette étape, appliquée après les filtres précédents, améliore la fiabilité globale de la classification avec un risque limité d’erreur.
En effet, ce seuillage a été délibérément placé en fin de chaîne car certaines jeunes peupleraies peuvent initialement présenter des valeurs de confiance plus faibles, et les éliminer dès le début aurait pu conduire à une sous-détection de ces peuplements.
Les étapes précédentes ayant déjà écarté de nombreux faux positifs, ce seuillage final peut être appliqué de manière plus sécurisée.
Concrètement, la fonction réalise les opérations suivantes :
Elle crée un masque en identifiant les pixels classés comme peupleraies (valeur 1) mais dont la confiance est inférieure au seuil spécifié.
Elle met à jour la classification en remplaçant ces pixels de faible confiance par la valeur 0 (non-peupleraie).
Elle masque également l’image de confiance en mettant à 0 les valeurs correspondant aux pixels exclus.
#' Apply confidence threshold to classification and confidence maps.
#' @param classif Classification data array
#' @param conf Confidence data array
#' @param params List of parameters to pass to the function (threshold: Confidence threshold value)
#' @return list of (thresholded_classification, updated_confidence)
apply_confidence_threshold <- function(classif, conf, params) {
# Create mask for low confidence pixels
reclass_mat <- matrix(
c(0, params$threshold, 0, # if value < threshold => 0
params$threshold, Inf, 1), # if value >= threshold => 1
ncol=3, byrow=TRUE)
mask <- classify(x = terra::deepcopy(conf),
reclass_mat,
include.lowest=TRUE)
mask = mask * classif # set value to 0 where classif == 0
message("Values frequency (mask):")
print(knitr::kable(terra::freq(mask, bylayer=FALSE)))
# Apply mask to classification and confidence
new_classif = terra::deepcopy(classif) * mask
new_conf = terra::deepcopy(conf) * mask
if (DEBUG) {
message(" Dans apply_confidence_threshold")
message("Values frequency (old classif):")
print(knitr::kable(terra::freq(classif, bylayer=FALSE)))
message("Values frequency (new classif):")
print(knitr::kable(terra::freq(new_classif, bylayer=FALSE)))
message("Values frequency (old conf):")
print(knitr::kable(terra::freq(conf, bylayer=FALSE)))
message("Values frequency (new conf):")
print(knitr::kable(terra::freq(new_conf, bylayer=FALSE)))
}
return(list(classif = new_classif, conf = new_conf))
}Fonction 5 : filtrage des groupes de pixels isolés (étape 6)
La fonction apply_sieve_filter élimine les petits groupes de pixels isolés, permettant de nettoyer la classification des artefacts et d’obtenir des entités plus cohérentes avec la réalité terrain.
Concrètement, la fonction réalise les opérations suivantes :
Elle utilise l’algorithme de tamisage (sieve) de rasterio pour identifier les groupes connectés de pixels.
Elle supprime les groupes dont la taille est inférieure au seuil spécifié, en considérant soit une connectivité de 4 (pixels adjacents horizontalement et verticalement), soit une connectivité de 8 (incluant aussi les diagonales).
Elle met à jour l’image de confiance en conséquence, en masquant les valeurs pour les pixels qui ont été supprimés de la classification.
#' Apply sieve filter to remove small isolated patches.
#' Uses terra's sieve filter to remove connected components smaller than the specified threshold.
#' @param classif Classification data array
#' @param conf Confidence data array
#' @param params List of parameters to pass to the function (size_threshold: Minimum size of connected components to keep, connectivity: Connectivity for connected components (4 or 8))
#' @return list of (filtered_classification, updated_confidence)
apply_sieve_filter <- function(classif, conf, params) {
# Apply sieve filter
new_classif = terra::sieve(
x = terra::deepcopy(classif), # distinct copy of classif
threshold = params$size_threshold, # set minimum pixel group size threshold
directions = params$connectivity) # pixels connection : 4 for rook, 8 for queen
# Update confidence
new_conf = conf * new_classif
if (DEBUG) {
message(" Dans apply_sieve_filter")
message("Values frequency (old classif):")
print(knitr::kable(terra::freq(classif, bylayer=FALSE)))
message("Values frequency (new classif):")
print(knitr::kable(terra::freq(new_classif, bylayer=FALSE)))
message("Values frequency (old conf):")
print(knitr::kable(terra::freq(conf, bylayer=FALSE)))
message("Values frequency (new conf):")
print(knitr::kable(terra::freq(new_conf, bylayer=FALSE)))
}
return(list(classif = classif, conf = conf))
}4.1.3.2 Lancement de la chaîne de post-traitement
Définition du pipeline
Ce fichier qui contient la définition sous forme de list des différentes fonction à lancer, avec les paramètres, est définis dans R_Scripts/pipeline_def.R
SLOPE_THRESHOLD <- 30 # degrees
HEIGHT_THRESHOLD <- 300 # centimeters
FILTER_SIZE <- 3
CONFIDENCE_THRESHOLD <- 30
SIEVE_SIZE_THRESHOLD <- 50
SIEVE_CONNECTIVITY <- 4
DEBUG <- TRUE
# Define the process chain as a list of lists
PROCESS_CHAIN <- list(
# Step 1: Binarize image
list(
name = "01_binarize",
fun = binarize_image,
params = list()
),
# Step 2: Apply slope threshold
list(
name = "02_slope_threshold",
fun = apply_threshold_from_aux,
params = list(
aux_index = 1,
threshold = SLOPE_THRESHOLD,
reclass_mat = matrix(c(0, SLOPE_THRESHOLD, 1,
SLOPE_THRESHOLD, Inf, 0),
ncol=3, byrow=TRUE)
)
),
# Step 3: Apply height threshold
list(
name = "03_height_threshold",
fun = apply_threshold_from_aux,
params = list(
aux_index = 2,
threshold = HEIGHT_THRESHOLD,
reclass_mat = matrix(c(0, HEIGHT_THRESHOLD, 0,
HEIGHT_THRESHOLD, Inf, 1),
ncol=3, byrow=TRUE)
)
),
# Step 4: Apply median filter
list(
name = "04_median_filter",
fun = apply_median_filter,
params = list(
filter_size = FILTER_SIZE
)
),
# Step 5: Apply confidence threshold
list(
name = "05_confidence_threshold",
fun = apply_confidence_threshold,
params = list(
threshold = CONFIDENCE_THRESHOLD
)
),
# Step 6: Apply sieve filter
list(
name = "06_sieve_filter",
fun = apply_sieve_filter,
params = list(
size_threshold = SIEVE_SIZE_THRESHOLD,
connectivity = SIEVE_CONNECTIVITY
)
)
)Application du pipeline par tuile
#' Process a single tile through selected steps of the pipeline.
#'
#' @param tile_path Path to the classification tile (str)
#' @param year_folder: Path to the year folder (str)
#' @param process_chain: List of processing steps (list)
#' @param slope_img: Global Slope image (SpatRaster)
#' @param height_img: Glocal Height image (SpatRaster)
#' @param start_step: Name of the first step to run (or None for first step) (str)
#' @param end_step: Name of the last step to run (or None for last step) (str)
#' @param visualize: Whether to create visualizations (boolean)
#' @return Status message or error
process_tile_pipeline <- function(
tiles_data,
year_folder,
process_chain,
slope_img,
height_img,
start_step = NULL,
end_step = NULL,
visualize = FALSE
) {
s_aux_path <- slope_img
h_aux_path <- height_img
classif <- tiles_data[1]
conf <- tiles_data[2]
for (step in process_chain) {
# Apply the step
step$params <- append(step$params, list(aux_path = list(s_aux_path, h_aux_path)))
returned_list = step$fun(classif, conf, params = step$params)
# replace inputs values with produced ones
conf = returned_list$conf
classif = returned_list$classif
}
message("***** END *****")
print(paste0(names(classif)," / ", names(conf) ))
return(sprc(classif,conf))
}A la différence de Python et Dask, qui est capable de parallélisé les traitements avec finalement assez peu de modifications sur le code (dès lors qu’il est bien découpés en fonction), nous devons en R expliciter le graph Targets manuellement, en définissant les embranchements et en positionnant nos appels de fonctions dans des Targets.
C’est pourquoi la suite du fichier _targets.R , qui pilote de façon explicite l’execution de notre chaîne, est plutôt décrite dans la partie Parallélisation de ce TP.
4.2 Environnement Python
4.2.1 Présentation des bibliothèques utilisées
Python est utilisé pour sa richesse en bibliothèques dédiées aux formats de données raster, sa capacité à traiter de gros volumes de données (via Dask notamment), et sa bonne interopérabilité avec les fichiers notamment au formats GeoTIFF et NetCDF.
Les bibliothèques principales sont :
-
xarray: bibliothèque de référence pour manipuler des tableaux multidimensionnels labellisés (comme les cubes temporels ou multi-spectraux). Pour gérer les données géospatiales raster,xarrayest souvent cominée àrioxarray, qui ajoute des fonctionnalités spatiales àxarrayce qui permet un traitement dans un format structuré et compatible avec des outils de calcul distribué (Dask). -
rioxarray: extension dexarraypour gérer les attributs géospatiaux (CRS, extent, etc.) en lisant et écrivant des fichiers raster géoréférencés (GeoTIFF, etc.). -
rasterio: bibliothèque de bas niveau pour lire/écrire des fichiers raster géospatiaux (basée sur GDAL). Elle est utilisée directement ou à traversrioxarray. -
matplotlib: bibliothèque de visualisation utilisée pour afficher des rasters, des profils temporels ou des comparaisons. -
dask: utilisé avecxarraypour paralléliser et optimiser les calculs sur des gros volumes de données raster.
Exemple :
4.2.2 Exécution de la Chaîne
Pour exécuter la chaîne de post-traitement, il faut d’abord importer les bibliothèques nécessaires et définir quelques fonctions utilitaires.
4.2.2.1 Importation des bibliothèques
# Library imports
import os
import numpy as np
from scipy.ndimage import median_filter
import dask
from dask.delayed import delayed
from dask.distributed import Client
import dask.array as da
from time import time
import datetime
from typing import Dict, Any, Callable, List, Optional, Tuple
import rioxarray as rxr
import xarray as xr
import rasterio
from rasterio.features import sieve
import gc
import matplotlib.pyplot as plt4.2.2.2 Définition des chemins et des paramètres par défaut
# Configuration paths
DATA_FOLDER = "./data_folder"
SLOPE_IMAGE = "./data_folder/auxiliary/slope_nouvelle_aquitaine.tif"
HEIGHT_IMAGE = "./data_folder/auxiliary/height_nouvelle_aquitaine.tif"
OUTPUT_DIR = os.path.expanduser("~/sageo_hpc_outputs")
os.makedirs(OUTPUT_DIR, exist_ok=True)
# Processing parameters
SLOPE_THRESHOLD = 30
HEIGHT_THRESHOLD = 300
FILTER_SIZE = 3
CONFIDENCE_THRESHOLD = 30
SIEVE_SIZE_THRESHOLD = 50
SIEVE_CONNECTIVITY = 4
# Dask configuration
NB_WORKERS = 2
NB_THREADS = 1
MEMORY_LIMIT = '8GB'4.2.2.3 Fonctions utilitaires
Ces fonctions aident à la manipulation des données raster et sont utilisées par les fonctions principales du pipeline.
def find_years(data_folder: str) -> List[str]:
"""
Find all year directories within the 'years' subdirectory of the data folder.
Args:
data_folder: Root data folder containing 'years' and 'auxiliary' directories
Returns:
List of year directory names
"""
# Construct path to the years directory
years_folder = os.path.join(data_folder, "years")
# Find all subdirectories in the years folder
return [d for d in os.listdir(years_folder)
if os.path.isdir(os.path.join(years_folder, d))]
def find_classification_tiles(folder: str) -> List[str]:
"""Find all classification tiles in a folder."""
classification_folder = os.path.join(folder, "classification")
if not os.path.exists(classification_folder):
return []
return [os.path.join(classification_folder, file)
for file in os.listdir(classification_folder)
if file.endswith(".tif")]
def get_window_from_bounds(bounds, transform, shape):
"""
Calculate window from bounds, transform and shape.
Args:
bounds: Tuple of (minx, miny, maxx, maxy)
transform: Affine transform of the raster
shape: Shape of the raster (bands, height, width)
Returns:
rasterio.windows.Window object
"""
row_start, col_start = ~transform * (bounds[0], bounds[3])
row_stop, col_stop = ~transform * (bounds[2], bounds[1])
row_stop = min(row_stop, shape[1])
col_stop = min(col_stop, shape[2])
height = max(row_stop - row_start, shape[1])
width = max(col_stop - col_start, shape[2])
return rasterio.windows.Window(col_start, row_start, width, height)
def read_auxiliary_data(aux_path, bounds, transform, shape):
"""
Read auxiliary data (slope or height) for the given bounds.
Args:
aux_path: Path to auxiliary raster
bounds: Tuple of (minx, miny, maxx, maxy)
transform: Affine transform of the target raster
shape: Shape of the target raster (bands, height, width)
Returns:
numpy.ndarray: Auxiliary data for the specified region
"""
with rasterio.open(aux_path) as aux_ds:
window = get_window_from_bounds(bounds, transform, shape)
return aux_ds.read(1, window=window)
def determine_active_steps(
process_chain: List[Dict],
start_step: Optional[str] = None,
end_step: Optional[str] = None
) -> Tuple[List[str], List[Dict], int, int]:
"""
Determine which steps to run based on start and end step names.
Args:
process_chain: List of processing step dictionaries
start_step: Name of the first step to run (or None for first step)
end_step: Name of the last step to run (or None for last step)
Returns:
tuple: (all_step_names, active_steps, start_idx, end_idx)
"""
all_step_names = [step["name"] for step in process_chain]
start_idx = all_step_names.index(start_step) if start_step and start_step in all_step_names else 0
end_idx = all_step_names.index(end_step) + 1 if end_step and end_step in all_step_names else len(all_step_names)
return all_step_names, process_chain[start_idx:end_idx], start_idx, end_idx
def generate_report(results, year, output_dir=None):
"""
Generate a summary report of processing results.
Args:
results: List of processing results
year: Year being processed
output_dir: Directory to save the report (default: current directory)
Returns:
str: Formatted report text
"""
# Categorize results
success = [r for r in results if not str(r).startswith("ERROR")]
errors = [r for r in results if str(r).startswith("ERROR")]
# Calculate statistics
total_tiles = len(results)
success_count = len(success)
error_count = len(errors)
success_rate = (success_count / total_tiles * 100) if total_tiles > 0 else 0
# Format the report header with more details
report = f"Processing Report for Year {year}\n"
report += f"=" * 50 + "\n"
report += f"Generated on: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n"
# Add summary statistics
report += f"Summary:\n"
report += f" Total tiles processed: {total_tiles}\n"
report += f" Successfully processed: {success_count} ({success_rate:.1f}%)\n"
report += f" Failed: {error_count}\n\n"
# Add successful tiles section
if success:
report += f"Successfully Processed Tiles:\n"
for i, tile in enumerate(success, 1):
report += f" {i}. {tile}\n"
report += "\n"
# Add error section with more details
if errors:
report += f"Errors:\n"
for i, err in enumerate(errors, 1):
# Extract more details from the error message if possible
error_parts = str(err).split(":", 1)
if len(error_parts) > 1:
tile_name = error_parts[0].replace("ERROR", "").strip()
error_msg = error_parts[1].strip()
report += f" {i}. Tile: {tile_name}\n Error: {error_msg}\n"
else:
report += f" {i}. {err}\n"
# Determine the output path
if output_dir is None:
output_dir = OUTPUT_DIR
report_dir = os.path.join(output_dir, "reports")
os.makedirs(report_dir, exist_ok=True)
report_path = os.path.join(report_dir, f"report_{year}_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.txt")
# Write report to file
with open(report_path, "w") as f:
f.write(report)
print(f"Report saved to: {report_path}")
return report4.2.2.4 Fonctions principales du pipeline
Fonction 1 : binarisation (étape 1)
La fonction binarize_image transforme les images de classification et de confiance en utilisant des DataArray de xarray.
Cette structure de données conserve automatiquement les informations géographiques (coordonnées, système de projection) pendant les différentes opérations.
Concrètement, la fonction réalise deux opérations essentielles :
Elle crée une image binaire où seuls les pixels identifiés comme peupleraies (classe 1) reçoivent la valeur 1, tous les autres pixels étant mis à 0. Le résultat est converti au format
uint8(entier 8 bits non signé) réduisant ainsi l’empreinte mémoire et accélérant les traitements ultérieurs.Elle traite l’image de confiance en ne conservant que les valeurs correspondant aux pixels de peupleraies. Ces valeurs sont multipliées par 100 pour les exprimer en pourcentage (0 à 100%) et converties au format
int16(entier 16 bits) offrant un bon compromis entre précision des données et efficacité de traitement.
def binarize_image(
classif: xr.DataArray,
conf: xr.DataArray,
**kwargs
) -> Tuple[xr.DataArray, xr.DataArray]:
"""
Binarize classification and confidence images.
Creates a binary classification (1 where class=1, 0 elsewhere) and
scales confidence values by 100 where classification is 1.
Args:
classif: Classification data array
conf: Confidence data array
Returns:
Tuple of (binary_classification, scaled_confidence)
"""
binary = (classif == 1).astype("uint8")
confidence_masked = (conf * 100 * binary).astype("int16")
return binary, confidence_maskedFonction 2 : filtrage à partir de données auxilliaires (étapes 2 et 3)
La fonction apply_threshold_from_aux permet d’affiner la classification en utilisant des données auxiliaires comme la pente ou la hauteur de la végétation.
Cette étape permet d’éliminer des pixels faux positifs détéctés à tort dans les peupleraies grâce à l’intégration de connaissances expertes.
Concrètement, la fonction réalise les opérations suivantes :
Elle charge les données auxiliaires (pente ou hauteur) seulement sur l’emprise géographique de la classification, en préservant la résolution spatiale et le géoréférencement.
Elle applique un seuil sur ces données auxiliaires pour créer un masque : - Pour la pente : elle élimine les pixels situés sur des pentes trop fortes (peu propices aux peupleraies) - Pour la hauteur : elle permet d’éliminer des pixels de végétation basse ayant une faible hauteur, peu susceptibles d’être des peupleraies
Elle met à jour la classification en supprimant les pixels qui ne respectent pas les critères définis par le seuil, en les remplaçant par la valeur 0 (non-peupleraie).
Elle masque également l’image de confiance en conséquence, en mettant à 0 les valeurs de confiance pour tous les pixels qui ont été exclus de la classification.
def apply_threshold_from_aux(
classif: xr.DataArray,
conf: xr.DataArray,
aux_path: str,
threshold: int,
filter_out_high_values: bool = True, # If True, remove values >= threshold
**kwargs
) -> Tuple[xr.DataArray, xr.DataArray]:
"""
Apply threshold to classification based on auxiliary data.
Reads auxiliary data (slope or height) and sets classification to 0
where auxiliary values exceed the threshold.
Args:
classif: Classification data array
conf: Confidence data array
aux_path: Path to auxiliary raster (slope or height)
threshold: Threshold value to apply
filter_out_high_values: If True, remove pixels where aux
threshold;
if False, remove pixels where aux < threshold
Returns:
Tuple of (thresholded_classification, updated_confidence)
"""
# Read auxiliary data for the region
aux_array = read_auxiliary_data(
aux_path,
classif.rio.bounds(),
classif.rio.transform(),
classif.shape
)
# Apply threshold
new_classif = classif.copy(deep=True)
class_array = new_classif.values[0].copy()
# Create the mask based on the threshold comparison
if filter_out_high_values:
mask = aux_array >= threshold # Remove pixels where aux >= threshold
else:
mask = aux_array < threshold # Remove pixels where aux < threshold
# Apply the mask
class_array[mask] = 0
new_classif.values[0] = class_array
# Update confidence
new_conf = conf.copy(deep=True)
new_conf = new_conf.where(new_classif > 0, 0)
return new_classif, new_confFonction 3 : filtrage médian
La fonction apply_median_filter applique un filtre médian sur les valeurs de confiance pour réduire le bruit et homogénéiser les résultats.
Concrètement, la fonction réalise les opérations suivantes :
Elle conserve l’image de classification d’origine tout en créant une copie pour les modifications ultérieures.
Elle applique un filtre médian sur l’image de confiance avec une fenêtre de taille paramétrable, ce qui permet de lisser les valeurs aberrantes tout en préservant les contours des objets.
Elle met à jour la classification en supprimant les pixels dont la confiance est devenue nulle après filtrage, assurant ainsi la cohérence entre les deux couches.
def apply_median_filter(
classif: xr.DataArray,
conf: xr.DataArray,
filter_size: int,
**kwargs
) -> Tuple[xr.DataArray, xr.DataArray]:
"""
Apply median filter to confidence values.
Applies a median filter to the confidence array and masks values
where classification is 0.
Args:
classif: Classification data array
conf: Confidence data array
filter_size: Size of the median filter window
Returns:
Tuple of (classification, filtered_confidence)
"""
# Keep classification unchanged
new_classif = classif.copy(deep=True)
# Apply median filter to confidence
conf_array = conf.values[0].copy()
filtered_array = median_filter(conf_array, size=filter_size)
# Create new confidence array
new_conf = conf.copy(deep=True)
new_conf.values[0] = filtered_array
# Mask confidence values where classification is 0
new_conf = new_conf.where(new_classif > 0, 0)
# Update classification to match - set to 0 where confidence is 0
new_classif_array = new_classif.values[0]
new_classif_array[new_conf.values[0] == 0] = 0
new_classif.values[0] = new_classif_array
return new_classif, new_confFonction 4 : application d’un seuil de confiance (étape 5)
La fonction apply_confidence_threshold permet d’éliminer les pixels classés comme peupleraies mais avec une faible confiance.
Cette étape, appliquée après les filtres précédents, améliore la fiabilité globale de la classification avec un risque limité d’erreur.
En effet, ce seuillage a été délibérément placé en fin de chaîne car certaines jeunes peupleraies peuvent initialement présenter des valeurs de confiance plus faibles, et les éliminer dès le début aurait pu conduire à une sous-détection de ces peuplements.
Les étapes précédentes ayant déjà écarté de nombreux faux positifs, ce seuillage final peut être appliqué de manière plus sécurisée.
Concrètement, la fonction réalise les opérations suivantes :
Elle crée un masque en identifiant les pixels classés comme peupleraies (valeur 1) mais dont la confiance est inférieure au seuil spécifié.
Elle met à jour la classification en remplaçant ces pixels de faible confiance par la valeur 0 (non-peupleraie).
Elle masque également l’image de confiance en mettant à 0 les valeurs correspondant aux pixels exclus.
def apply_confidence_threshold(
classif: xr.DataArray,
conf: xr.DataArray,
threshold: int,
**kwargs
) -> Tuple[xr.DataArray, xr.DataArray]:
"""
Apply confidence threshold to classification and confidence maps.
Sets classification to 0 where confidence is below the threshold.
Args:
classif: Classification data array
conf: Confidence data array
threshold: Confidence threshold value
Returns:
Tuple of (thresholded_classification, thresholded_confidence)
"""
# Create mask for low confidence pixels
mask = (conf < threshold) & (classif == 1)
# Apply mask to classification and confidence
new_classif = classif.where(~mask, 0)
new_conf = conf.where(~mask, 0)
return new_classif, new_confFonction 5 : filtrage des groupes de pixels isolés (étape 6)
La fonction apply_sieve_filter élimine les petits groupes de pixels isolés, permettant de nettoyer la classification des artefacts et d’obtenir des entités plus cohérentes avec la réalité terrain.
Concrètement, la fonction réalise les opérations suivantes :
Elle utilise l’algorithme de tamisage (sieve) de rasterio pour identifier les groupes connectés de pixels.
Elle supprime les groupes dont la taille est inférieure au seuil spécifié, en considérant soit une connectivité de 4 (pixels adjacents horizontalement et verticalement), soit une connectivité de 8 (incluant aussi les diagonales).
Elle met à jour l’image de confiance en conséquence, en masquant les valeurs pour les pixels qui ont été supprimés de la classification.
def apply_sieve_filter(
classif: xr.DataArray,
conf: xr.DataArray,
size_threshold: int,
connectivity: int,
**kwargs
) -> Tuple[xr.DataArray, xr.DataArray]:
"""
Apply sieve filter to remove small isolated patches.
Uses rasterio's sieve filter to remove connected components
smaller than the specified threshold.
Args:
classif: Classification data array
conf: Confidence data array
size_threshold: Minimum size of connected components to keep
connectivity: Connectivity for connected components (4 or 8)
Returns:
Tuple of (filtered_classification, updated_confidence)
"""
# Get classification as numpy array
class_array = classif.values[0].copy()
# Apply sieve filter
filtered_array = sieve(class_array, size_threshold, connectivity=connectivity)
# Create new classification array
new_classif = classif.copy(deep=True)
new_classif.values[0] = filtered_array
# Update confidence
new_conf = conf.where(new_classif > 0, 0)
return new_classif, new_conf4.2.2.5 Fonction de visualisation
def visualize_processing_steps(original_classif, step_results, tile_name):
"""
Visualize processing steps for a tile.
Args:
original_classif: Original classification data array
step_results: List of dictionaries with intermediate results
tile_name: Name of the tile for display
Returns:
matplotlib.figure.Figure: Figure with visualizations
"""
# Create a figure with one row per step
n_steps = len(step_results)
n_cols = 3
n_rows = (n_steps + 1 + n_cols - 1) // n_cols
fig, axes = plt.subplots(n_rows, n_cols, figsize=(n_cols * 4, n_rows * 3.5))
axes = axes.flatten()
# Subsample for display
max_size = 1000
if original_classif.shape[1] > max_size or original_classif.shape[2] > max_size:
scale_factor = max(original_classif.shape[1], original_classif.shape[2]) // max_size + 1
display_data = original_classif.values[0][::scale_factor, ::scale_factor]
else:
display_data = original_classif.values[0]
# Display original
axes[0].imshow(display_data, cmap='binary')
axes[0].set_title("Original")
axes[0].set_xticks([])
axes[0].set_yticks([])
# Calculate initial percentage
poplar_pixels = np.sum(original_classif.values[0] == 1)
total_pixels = original_classif.values[0].size
percent = (poplar_pixels / total_pixels) * 100
axes[0].set_xlabel(f"Total pixels: {total_pixels}")
# Variable to store previous pixel count
prev_poplar_pixels = poplar_pixels
# Display each step
for i, step in enumerate(step_results):
# Subsample for display
if step["classif"].shape[1] > max_size or step["classif"].shape[2] > max_size:
scale_factor = max(step["classif"].shape[1], step["classif"].shape[2]) // max_size + 1
display_data = step["classif"].values[0][::scale_factor, ::scale_factor]
else:
display_data = step["classif"].values[0]
# Display image
axes[i+1].imshow(display_data, cmap='binary')
axes[i+1].set_title(step["name"].split("_")[1] if "_" in step["name"] else step["name"])
axes[i+1].set_xticks([])
axes[i+1].set_yticks([])
# Calculate percentage and reduction
poplar_pixels = np.sum(step["classif"].values[0] > 0)
total_pixels = step["classif"].values[0].size
percent = (poplar_pixels / total_pixels) * 100
if prev_poplar_pixels > 0:
reduction = 100 - (poplar_pixels / prev_poplar_pixels * 100)
axes[i+1].set_xlabel(f"Poplar pixels: {poplar_pixels} (-{reduction:.1f}%)")
else:
axes[i+1].set_xlabel(f"Poplar pixels: {poplar_pixels}")
prev_poplar_pixels = poplar_pixels
# Mask unused axes
for j in range(7, len(axes)):
axes[j].axis('off')
# Add global title
plt.suptitle(f"Post-processing results for tile {tile_name}", fontsize=14)
plt.tight_layout(rect=[0, 0, 1, 0.95])
return fig4.2.2.6 Lancement de la chaîne de post-traitement
Définition du pipeline
# Define the processing chain
PROCESS_CHAIN = [
{
"name": "01_binarize",
"function": binarize_image,
"params": {}
},
{
"name": "02_slope_threshold",
"function": apply_threshold_from_aux,
"params": {
"aux_path": SLOPE_IMAGE,
"threshold": SLOPE_THRESHOLD,
"filter_out_high_values": True
}
},
{
"name": "03_height_threshold",
"function": apply_threshold_from_aux,
"params": {
"aux_path": HEIGHT_IMAGE,
"threshold": HEIGHT_THRESHOLD,
"filter_out_high_values": False
}
},
{
"name": "04_median_filter",
"function": apply_median_filter,
"params": {
"filter_size": FILTER_SIZE
}
},
{
"name": "05_confidence_threshold",
"function": apply_confidence_threshold,
"params": {
"threshold": CONFIDENCE_THRESHOLD
}
},
{
"name": "06_sieve_filter",
"function": apply_sieve_filter,
"params": {
"size_threshold": SIEVE_SIZE_THRESHOLD,
"connectivity": SIEVE_CONNECTIVITY
}
}
]Application du pipeline par tuile
def process_tile_pipeline(
tile_path: str,
year_folder: str,
process_chain: List[Dict],
start_step: Optional[str] = None,
end_step: Optional[str] = None,
visualize: bool = False,
output_dir: Optional[str] = None
) -> str:
"""
Process a single tile through selected steps of the pipeline.
Args:
tile_path: Path to the classification tile
year_folder: Path to the year folder
process_chain: List of processing steps
start_step: Name of the first step to run (or None for first step)
end_step: Name of the last step to run (or None for last step)
visualize: Whether to create visualizations
Returns:
str: Status message or error
"""
# Use OUTPUT_DIR if no custom output directory is provided
if output_dir is None:
output_dir = OUTPUT_DIR
# Prepare input paths
conf_path = tile_path.replace("classification", "confiance")
# Get the tile name from the classification file name
tile_name = os.path.basename(tile_path).split("_")[1]
print(f"Starting pipeline for tile: {tile_name}")
# Determine which steps to run
_, active_steps, _, _ = determine_active_steps(process_chain, start_step, end_step)
# Load data
classif = rxr.open_rasterio(tile_path)
conf = rxr.open_rasterio(conf_path)
# Save original classification for visualization
classif_original = classif.copy() if visualize else None
# Store intermediate results for visualization if needed
step_results = [] if visualize else None
# Process each step
for step in active_steps:
print(f" Processing {step['name']} for {tile_name}")
# Apply the step
classif, conf = step["function"](classif, conf, **(step["params"] or {}))
# Store intermediate result if visualizing
if visualize:
step_results.append({
"name": step["name"],
"classif": classif.copy(),
"conf": conf.copy()
})
gc.collect()
# Write results
# Create output folders
# Create output folders
year = os.path.basename(year_folder)
year_output_dir = os.path.join(output_dir, year)
# Create year output directory
os.makedirs(year_output_dir, exist_ok=True)
output_classif_folder = os.path.join(year_output_dir, "classification")
output_conf_folder = os.path.join(year_output_dir, "confiance")
os.makedirs(output_classif_folder, exist_ok=True)
os.makedirs(output_conf_folder, exist_ok=True)
# Write files
classif.rio.to_raster(os.path.join(output_classif_folder, os.path.basename(tile_path)),
compress="lzw")
conf.rio.to_raster(os.path.join(output_conf_folder, os.path.basename(conf_path)),
compress="lzw")
# Create visualization if requested
if visualize:
# Create visualization folder
vis_folder = os.path.join(year_output_dir, "visualization")
os.makedirs(vis_folder, exist_ok=True)
# Generate visualization
fig = visualize_processing_steps(
classif_original,
step_results,
tile_name
)
# Save image
fig.savefig(os.path.join(vis_folder, f"{tile_name}_steps.png"),
dpi=200,
bbox_inches='tight')
plt.close(fig)
# Clean up
classif, conf = None, None
gc.collect()
print(f"Completed pipeline for tile: {tile_name}")
return tile_nameApplication du pipeline pour toutes les tuiles et les années avec map_blocks
def run_pipeline_years_map_blocks(
data_folder: str,
process_chain: List[Dict],
start_step: Optional[str] = None,
end_step: Optional[str] = None,
n_workers: int = NB_WORKERS,
n_threads: int = NB_THREADS,
visualize: bool = True,
memory_limit: str = MEMORY_LIMIT,
output_dir: Optional[str] = None
):
"""
Run the processing pipeline for all years and tiles using Dask's map_blocks approach.
"""
# Use OUTPUT_DIR if no custom output directory is provided
if output_dir is None:
output_dir = OUTPUT_DIR
# Ensure output directory exists
os.makedirs(output_dir, exist_ok=True)
total_start = time()
print(f"Starting pipeline with {n_workers} workers and {n_threads} threads per worker (map_blocks implementation)")
print(f"Processing steps: {start_step or 'beginning'} to {end_step or 'end'}")
# Create Dask client with specified configuration
client = Client(
processes=True,
n_workers=n_workers,
threads_per_worker=n_threads,
memory_limit=memory_limit
)
# Find all years
years = find_years(data_folder)
print(f"Found {len(years)} years to process")
# Process each year
for year in years:
year_folder = os.path.join(data_folder, "years", year)
year_output_dir = os.path.join(output_dir, year)
# Create year output directory
os.makedirs(year_output_dir, exist_ok=True)
# Find input tiles
input_folder = os.path.join(year_folder, "inputs")
tile_paths = find_classification_tiles(input_folder)
print(f"Processing {len(tile_paths)} tiles for year {year}")
# Define a function to process tiles by index
def process_tile_by_index(block):
results = []
for idx in block:
idx = int(idx)
if idx < len(tile_paths):
result = process_tile_pipeline(
tile_path=tile_paths[idx],
year_folder=year_folder,
process_chain=process_chain,
start_step=start_step,
end_step=end_step,
visualize=visualize,
output_dir=output_dir
)
results.append(result)
else:
results.append(None)
return np.array(results, dtype=object)
# Create a dummy array with indices for each tile
dummy_array = da.arange(len(tile_paths), chunks=1)
# Apply map_blocks to process all tiles
results_array = dummy_array.map_blocks(
process_tile_by_index,
dtype=object
)
# Compute results
year_results = results_array.compute()
# Generate report for the year
generate_report(year_results.tolist(), year, output_dir=year_output_dir)
print(f"Year {year} completed: {len(year_results)} tiles processed")
print(f"Total processing time: {time() - total_start:.2f} seconds")
# Ensure client is closed even if an error occurs
client.close()
print("Dask client closed")4.2.3 Map_blocks versus Futures
La parallélisation du traitement des tuiles raster peut être réalisée de différentes manières avec Dask.
Notre implémentation utilise map_blocks, une approche de haut niveau qui applique une fonction à des blocs de données indépendants.
Dans le code utilisé, chaque bloc contient un indice unique (chunks=1) qui pointe vers une tuile raster spécifique à traiter :
Le paramètre chunks définit la taille des blocs traités en parallèle.
Par exemple, avec chunks=2, chaque bloc contiendrait 2 indices consécutifs, ce qui réduirait le nombre de tâches parallèles.
Des valeurs plus élevées peuvent être efficaces pour de petites tuiles mais risquent de compromettre l’optimisation de la distribution des ressources entre les workers.
Ici, chque bloc est indépendant car : - Le traitement d’une tuile ne dépend pas du résultat du traitement d’une autre tuile - Chaque tuile peut être chargée, traitée et sauvegardée séparément - Il n’y a pas de communication nécessaire entre les traitements des différentes tuiles
Dask exploite cette indépendance pour paralléliser les traitements :
- Distribution : chaque bloc peut être envoyé à n’importe quel worker disponible
- Parallélisation : plusieurs blocs peuvent être traités simultanément sur différents cœurs ou machines
- Tolérance aux pannes : si le traitement d’un bloc échoue, seul ce bloc doit être recalculé
Contrairement aux futures explicites créées avec dask.delayed(), map_blocks abstrait la gestion des tâches individuelles en traitant automatiquement la distribution du travail. Dans notre pipeline, nous créons un tableau d’indices (dummy_array = da.arange(len(tile_paths), chunks=1)) où chaque indice correspond à une tuile à traiter. La fonction map_blocks applique ensuite process_tile_by_index à chaque bloc (contenant un seul indice), permettant ainsi de traiter chaque tuile indépendamment et en parallèle.
Une approche alternative utilisant des futures explicites. Une future dans Dask est essentiellement une promesse de résultat : un objet qui représente un calcul qui sera exécuté ultérieurement. Une implémentation avec des futures ressemblerait à ceci :
def run_pipeline_years(
data_folder: str,
process_chain: List[Dict],
start_step: Optional[str] = None,
end_step: Optional[str] = None,
n_workers: int = NB_WORKERS,
n_threads: int = NB_THREADS,
visualize: bool = True,
memory_limit: str = MEMORY_LIMIT,
):
"""
Run the processing pipeline for all years and tiles.
"""
total_start = time()
print(f"Starting pipeline with {n_workers} workers and {n_threads} threads per worker")
print(f"Processing steps: {start_step or 'beginning'} to {end_step or 'end'}")
# Create Dask client with specified configuration
client = Client(
processes=True,
n_workers=n_workers,
threads_per_worker=n_threads,
memory_limit=memory_limit
)
# Find all years
years = find_years(data_folder)
print(f"Found {len(years)} years to process")
# Create a list to store all futures
all_futures = []
# Submit tasks for all tiles across all years
for year in years:
year_folder = os.path.join(data_folder, "years", year)
# Find input tiles
input_folder = os.path.join(year_folder, "inputs")
tile_paths = find_classification_tiles(input_folder)
print(f"Submitting {len(tile_paths)} tiles for year {year}")
# Submit each tile as a separate task
for tile_path in tile_paths:
future = client.submit(
process_tile_pipeline,
tile_path=tile_path,
year_folder=year_folder,
process_chain=process_chain,
start_step=start_step,
end_step=end_step,
visualize=visualize
)
# Store the future with its year for later reporting
all_futures.append((year, future))
# Process results as they complete
year_results = {}
for year, future in all_futures:
result = future.result() # This will wait for the task to complete
# Initialize year results if needed
if year not in year_results:
year_results[year] = []
# Add result to the appropriate year
year_results[year].append(result)
# Generate report when all tiles for a year are done
if len(year_results[year]) == len([f for y, f in all_futures if y == year]):
report_dir = os.path.join(data_folder, "years", year, "outputs", "reports")
generate_report(year_results[year], year, output_dir=report_dir)
print(f"Year {year} completed: {len(year_results[year])} tiles processed")
print(f"Total processing time: {time() - total_start:.2f} seconds")
# Ensure client is closed even if an error occurs
client.close()
print("Dask client closed")
if __name__ == '__main__':
# Run the pipeline with all steps
run_pipeline_years(
data_folder=DATA_FOLDER,
process_chain=PROCESS_CHAIN,
n_workers=NB_WORKERS,
n_threads=NB_THREADS,
visualize=True,
)Cette approche avec les futures explicites fonctionne différemment de map_blocks. Voici comment elle opère en détail:
- Chaque tuile est soumise individuellement au planificateur Dask via
client.submit()oudask.delayed()qui enregistre la tâche sans l’exécuter immédiatement - L’exécution est déclenchée soit par future.result() pour une future individuelle soit par dask.compute(*futures) pour calculer plusieurs futures en parallèle. Les workers disponibles traitent alors ces tâches simultanément et les résultats sont récupérés une fois les calculs terminés.
Cette approche avec futures offre plus de contrôle sur chaque tâche individuelle et permet de construire des graphes de dépendances plus complexes, mais nécessite une gestion plus explicite du parallélisme. Dans notre pipeline, nous avons privilégié map_blocks pour sa concision et son intégration naturelle avec les tableaux Dask, permettant ainsi de traiter chaque tuile indépendamment et en parallèle tout en bénéficiant des optimisations internes de Dask.