5 Parallélisation

5.1 Stratégies de parallélisation

Sachant que le lot de traitements définis dans le pipeline peuvent être réalisé indépendamment sur chaque tuiles, deux stratégies de division du travail viennent à l’esprit.

5.1.1 Pour chaque tuile

Dans ce cas là, chacune des 27 tuiles de l’exemple va voir s’appliquer l’ensemble des traitements successivement.

5.1.2 Par lots de n tuiles

Dans ce cas là, on applique un traitement au 27 tuiles avant de passer au traitement suivant.

5.1.3 Par années

Comme nous traitons les années de notre pipeline de traitement de façon indépendante, nous pouvons ajouter assez simplement un niveau supplémentaire de parallélisation.

5.2 Parallélisation du code avec R

5.2.1 Targets

Section adaptée de l’atelier Des notebooks plus reproductibles avec targets

Lorsqu’un script est exécuté, l’ensemble du code est exécuté. Les données sont retéléchargées, relues, traitées à nouveau alors que le but était peut-être seulement de corriger une faute de frappe dans le titre d’un graphique. Tout cela est gourmand en ressources, que ce soit les ressources locales ou celles de serveurs. C’est aussi gourmand en temps. Un script simple traitant une image peut mettre quelques minutes à être compilé sur une machine de bureau moyennement puissante. L’impact est réduit mais qu’en est-il pour des traitements prenant plusieurs dizaines minutes, voire plusieurs heures comme pour traiter nos deux années de tuiles ? targets peut être vu comme l’équivalent de Make dans l’environnement R. Son but est de rationaliser l’exécution de la chaîne de traitement en n’exécutant que les parties qui doivent être calculées à nouveau. Pour cela, targets stocke métadonnées et sorties dans un dossier _targets/.

Enfin targets peut également faciliter le calcul haute performance (HPC) en parallélisant et distribuant les calculs.

targets fait partie de la communauté ROpenSci (packages R pour la science).

Étapes:

  1. créer la structure
  2. créer les fonctions de traitements et les targets associées
  3. exécuter le pipeline

5.2.1.1 Mise en place de la structure

targets pour fonctionner va lire le fichier _targets.R situé la racine du projet. Les différentes cibles (targets) définies dans ce fichier vont appeler des fonctions personnalisées, stockées par convention dans le fichier R/functions.R.

├── _targets.R
├── R/
│   ├── functions.R

Dans cet atelier, les fonctions sont stockées dans plusieurs scripts R placés dans le dossier R_Scripts. Par exemple, la fonction find_years() se trouve dans le fichier R_Scripts/utils.R

#' find_years
#' @description Find all year directories within the 'years' subdirectory of the data folder.
#' @param data_folder Root data folder containing 'years' and 'auxiliary' directories
#' @return character vector of year directory names

find_years <- function(data_folder) {

    #Find all subdirectories in the years folder
    years = list.files(
      file.path(data_folder, "years"), # construct path to years folder
      full.names = TRUE, # not full path, only year folder name
      recursive = FALSE,
      include.dirs = TRUE,
      all.files = FALSE) # don't look to subfolders
    return(as.list(years))
}

la ligne source("./R_Scripts/utils.R") placée au début du fichier _targets.R va chargé le code contenu dans ce fichier.

5.2.2 Création des targets

Les targets à exécuter sont stockées dans un objet de type list situé dans _targets.R et devront appeler la fonction tar_targets qui prend 2 arguments:

  • le nom de la target, qui servira à rappeler les résultats
  • la command a exécuté.
# Exemple tiré de la documentation de targets: https://books.ropensci.org/targets/targets.html#dependencies 

list(
  tar_target(
    name = second_target,
    command = outer_function(first_target) + 2
  ),
  tar_target(
    name = first_target,
    command = 2
  )
)

Notez que l’ordre des targets dans la liste n’est pas important. Lors de l’appel de tar_make(), targets évalue l’arbre d’exécution afin que les étapes soient faites dans l’ordre nécessaire.

Par commodité, nous avons choisi d’utiliser la syntaxe de targetypes, un package complémentaire à targets.

tar_plan(
    
   tar_target(list_years,
          find_years("/home/2025021/PARTAGE/data_folder")),

    tar_target(list_tiles,
               get_raster_path("/home/2025021/PARTAGE/data_folder", list_years),
               pattern=map(list_years)),

[...]

)

Ici, les targets sont déclarées dans la fonction tar_plan(). Ce n’est pas le cas ici mais cette syntax permet de se passer des appels à tar_target(). Par exemple, la première ligne pourrait s’écrire comme ça :

tar_plan(
    
   list_years = find_years("/home/2025021/PARTAGE/data_folder")),
  tar_target(list_tiles,
               get_raster_path("/home/2025021/PARTAGE/data_folder", list_years),
               pattern=map(list_years)),

[...]

)

5.2.2.1 Exécuter le pipeline

La commande tar_make() va lancer l’analyse de l’arbre d’exécution et l’exécution des targets identifiées.

Pour chaque cible dans le plan d’exécution, targets affichera un message d’information:

  • Dispatched : la cible sera exécutée
  • Skipped : la cible n’a pas besoin d’être exécutée
  • Error : les erreurs éventuelles (stop l’exécution)

tar_invalidate(<nom de la target>) permet d’invalider une étape pour la relancer si nécessaire. tar_invalidate(everything()) invalide toute la chaîne de traitement. tar_prune() et tar_destroy() permettent le nettoyage plus ou moins total de l’espace de stockage.

5.2.2.2 Limitations de targets

targets est un excellent outil pour renforcer la chaîne de traitement en revanche il nécessite de repenser la base de code pour la transformer en fonctions si cela n’a pas été prévu dès le départ. Ce processus n’est pas évident et peut demander un peu de pratique de la programmation fonctionnelle et demande du temps supplémentaire. De plus, targets gère mal les fichiers géospatiaux mais le package geotargets de Nicholas Tierney permet de pallier à cela.

5.2.3 GeoTargets

Les très nombreux formats spatiaux disposent de caractéristiques supplémentaires par rapport à des formats plus classiques.

On pourra citer par exemple la présence de fichiers métadata compagnon (projection, extent, resolution, etc.), du type aux ou aux.xml par exemple, qui interviennent lors de l’écriture ou de la lecture des fichiers.

Jusqu’à l’arrivée de GeoTargets développé par l’équipe de Nicholas Tierney, Eric Scott et Andrew Brown (Tierney, Scott, and Brown 2024), l’écriture et la lecture de formats spatiaux avec R, Terra et Targets s’avère assez compliqué.

Cela se traduit par la mise à disposition par GeoTargets de Targets spécifique :

  • tar_terra_rast : gestion de format rasters de classe SpatRaster
  • tar_terra_vect : gestion de formats vecteurs de type SpatVector
  • tar_terra_sprc : gestion de collections de rasters de classe SpatRasterCollection
  • tar_terra_vrt : gestion de raster au format GDAL Virtual Dataset (VRT)

Pour le reste le fonctionnement s’inscrit dans la continuité de Targets, avec quand même une différence, c’est que l’entrée d’un fichier spatial dans un workflow implique par defaut la recopie dans l’espace de travail _targets/objects puis sa réécriture. Ce qui n’est pas sans poser de problème lorsque un fichier raster pèse plusieurs dizaine de Gigaoctets, comme en témoigne la discussion suivante en lien avec notre workflow…

Sachant la discussion évoqués ci-dessus, les possibilités pour adapter notre code sont les suivantes :

  • compresser les raster en utilisant les options de GDAL ,
  • dégrader la qualité des raster en manipulant leur datatype,
  • découper les rasters en tuiles de même taille que nos tuilles
  • s’appuyer sur le format VRT de Gdal.

Nous avons opté pour la simplicité du VRT, même si il faudra rester vigileant dans son utilisation dans un environnement parallèle et multi-utilisateur. Outre le fait qu’il s’agit d’un xml qui décrit un pointeur vers un fichier stocké localement, le VRT propose de la lazy evaluation, c’est à dire une évaluation différé des traitements, le raster étant appelé seulement lorsqu’un traitement est nécessaire.

Dans un deuxième temps, et pour donner une alternative, nous avons également fourni une solution de pipeline R avec des tuiles prédécoupés des rasters pour Height et Slope.

Les différentes étapes du Pipeline en R et en Python sont déjà décrites dans la partie @ref(#chaine-de-traitement), nous détaillons ici les spécificités du workflow pour sa parallélisation.

5.2.4 Crew

Crew et Crew.clusters sont des librairies R qui permet de distribuer toute ou partie des computations ayant lieu dans notre le workflow =Targets= sur des environnements de calcul distribués différents.

Un controleur Crew accepte des tâches (tasks), retourne des résultats, et surtout lance des Workers. Ces Workers execute de façon autonome, en asynchrone, des tasks depuis et sur la machine locale (crew_controller_local), ou bien depuis la machine locale vers des noeuds de Cluster classique, par exemple avec SLURM(crew_controller_slurm).

5.2.5 Distribution des calculs

5.2.5.1 En local

Le mesocentre du CRIANN, et à l’avenir peut être la plateforme pédagogique de MesoNet, nous offre l’environnement Jupyter pour déployer le TP avec un kernel R et un kernel Python.

Des comptes génériques allant du login geomat02 à geomat21 ont été créés pour l’ensemble des participant.e.s de ce TP, ils permettent de se connecter à votre espace Jupyter à l’adresse suivante : https://austral-hub.criann.fr/hub/home

Vous devez ensuite selectionner un Template, c’est à dire une capacité Cpu et Mémoire, qui portera le Notebook.

Pour ce TP, le template avec 32 ou 48 coeurs devrait suffire, sachant que la contrainte pouvant limiter les calculs dans le cas des Rasters c’est plutôt l’utilisation mémoire que le nombre de coeurs.

Dans notre fichier targets.R nous ajoutons les informations nécessaires au lancement et au suivi de plusieurs workers:

  • le package R autometric permet d’écrire des logs dans le répertoire
  • crew_controller_local définit le nombre de workers et les options de métriques rapportés dans les logs
controller <- crew_controller_local(
  workers = 4,
    garbage_collection=TRUE,
  options_metrics = crew_options_metrics(
    path = "worker_log_directory/",
    seconds_interval = 1
  )
)

if (tar_active()) {
  controller$start()
  log_start(
    path = "log.txt",
    seconds = 1,
    pids = controller$pids()
  )
}

Le nombre de Workers dans la fonction crew_controller_local (4 ici) détermine le nombre de processus R externe qui pourront calculer les Targets du workflow. En faisant cela le processus R principal de notre programme délégue une partie des calculs à ces Workers. Chacun de ces workers va accueillir au fil de l’eau une liste de Targets à traiter, c’est un fonctionnement classique de file (queue) tel qu’on la croise régulièrement en informatique.

Dans la suite du fichier, on définit la fonction tar_option_set qui fixe les options globales à Targets.

tar_option_set(controller = controller, storage = "worker", retrieval = "worker", memory = "auto")

Ces options ne sont pas évidentes à manipuler et peuvent avoir un impact important sur votre workflow. Elles peuvent être fixé de façon générale pour l’ensemble des Targets du pipeline, ou alors de façon plus individuelle pour chaque Targets.

  • l’option storage avec la valeur worker ici que les données sont délégués et gérés par les Workers et non plus le main R script.
  • l’option retrieval avec la valeur worker indique qu’il faut charger les dépendances précédentes.
  • l’option memory permet de s’adapter dans la gestion mémoire, par exemple les embranchements dynamiques sont un cas particulier ou la mémoire doit plutôt être persistent pour éviter la relecture en boucle des données liés à la combinatoire de l’embranchement.

Faites attentions aux releases de la librairie Targets, il y a des changements qui peuvent être important entre deux version. C’est le cas pour ce TP, entre la version utilisé pour prototypé sur Onyxia (Targets v1.11.3), et la version utilisé dans l’environnement mis à disposition par le CRIANN (Targets v1.10.1) les options passés par défaut ont été modifiés : https://github.com/ropensci/targets/releases/tag/1.11.0

5.2.6 SLURM

5.2.7 Workflow

Dans le cas ou nous utilisons les deux fichiers auxilliaires sans découpage, nous définissons deux cibles tar_target au format file.

tar_plan(
  tar_target(
      height_file,
      file.path("/home/2025021/PARTAGE/data_folder/auxiliary/height_nouvelle_aquitaine.tif"),
      format = "file",
      ),
      
  tar_target(
      slope_file,
      file.path("/home/2025021/PARTAGE/data_folder/auxiliary/slope_nouvelle_aquitaine.tif"),
      format = "file",
      )
)

En indiquant ici le format = "file" on indique à Targets que nous allons utiliser des fichiers extérieurs. Ces fichiers seront suivi, mais pas gérés directement par Targets.

Ces deux cibles sont ensuite passé à GeoTargets pour être utilisé au format VRT via la cible tar_terra_vrt.

En fonction des capacités en mémoire de la machine, vous pouvez définir des stratégies différentes en termes d’utilisation de la mémoire tout au long du pipeline.

On propose de garder en local et en mémoire ces rasters de taille importante pour ne pas avoir à les relire à chaque fois.

   tar_terra_vrt(load_height,
                   terra::rast(height_file), # charge le fichier
                   memory="persistent",
                   repository="local"
                  ),
    
    tar_terra_vrt(load_slope,
                   terra::rast(slope_file),
                    memory="persistent",
                   repository = "local",
                  ),

La partie suivante, toujours dans la portée de tar_plan décrit le branchement dynamique qui correspond à la stratégie définie en début de cette partie.

   tar_target(list_years,
          find_years("/home/2025021/PARTAGE/data_folder")),

    tar_target(list_tiles,
               read_raster(list_years),
               pattern=map(list_years)),

    tar_target(tile_pipeline,
               list_tiles,
               iteration="list"),
    
    # https://github.com/ropensci/targets/issues/1426
    tar_terra_sprc(tile_pipeline_disagregated,
       load_raster(tile_pipeline),
       pattern=map(tile_pipeline),
       retrieval = "main"
      ),

La première target appelle la fonction find_years dans R_Script/Utils.R, celle-ci renvoie la liste des années présente dans le dossier data_folder

#' find_years
#' @description Find all year directories within the 'years' subdirectory of the data folder.
#' @param data_folder Root data folder containing 'years' and 'auxiliary' directories
#' @return character vector of year directory names

find_years <- function(data_folder) {

    #Find all subdirectories in the years folder
    years = list.files(
      file.path(data_folder, "years"), # construct path to years folder
      full.names = TRUE, # not full path, only year folder name
      recursive = FALSE,
      include.dirs = TRUE,
      all.files = FALSE) # don't look to subfolders
    return(as.list(years))
}

La deuxième target applique la fonction read_raster() de /R_Scripts/utils.R. Celle-ci renvoie pour chaque fichier de classification ./classification/xxx.tif de tuile une liste contenant le tif de classification(./classification/xxx.tif), et le tif de confiance (./confiance/xxx.tif) . Il s’agit donc d’une liste de liste.

#' find_classification_tiles
#' @description Find all classification tiles in a folder
#' @param folder path to folder to look into
find_classification_tiles <- function(folder){
    classification_folder = file.path(folder, "inputs", "classification")
    # Find all TIF file in classification folder and return relative paths
    tif <- Sys.glob(file.path(classification_folder, "*.tif"))
    r <- lapply(tif, function(x) list (x, gsub("classification","confiance",x)))
    return(r)
}

read_raster <- function(input_folder) {
    # Prepare input paths
    tiles_and_conf <- find_classification_tiles(input_folder)
    return(tiles_and_conf)
}

Le pattern appliqué sur cette target est de type map, ce qui va déterminer un branchement dynamique avec un nombre de branches équivalente au nombre d’années présente dans le dossier.

Sachant que nous avons ici 2017 et 2018, on intuite qu’il y aura 2 branches, mais pour Targets c’est une propriété qui est définit au moment ou le workflow s’execute, et non pas à l’avance, d’où son statut dynamique.

La troisième target n’applique pas de fonction particulière, il s’agit d’itérer sur les listes de tuiles que l’on aura récupéré de la target précédente.

Le comportement du mot clef iteration="list" est différent selon qu’il s’applique à une target dynamique d’une target non dynamique. Dans le premier cas, il aggrége les résultat, alors que dans le second cas il se content d’itérer. Ici nous nous situons bien dans le second cas.

Nous pouvons savoir si il s’agit d’un branchement dynamique ou pas en fonction de la présence du mot clef pattern. Si il est présent c’est que le branchement sera de type dynamique, sinon c’est qu’il s’agit d’un branchement non dynamique.

La quatrième target récupère une liste avec ses deux composantes (classification et conf), et les passe à la fonction load_raster() dans /R_Scripts/utils.R. Comme son nom l’indique, cette fonction appele la fonction rast() de terra pour charger les deux fichiers de la tuile, avant de les renvoyer sous forme de SpatRasterCollection à la target suivante.

load_raster <- function(tiles) {
    # Load data
    classif <- tiles[[1]][1]
    conf <- tiles[[2]][1]
    classif <- terra::rast(classif[[1]])
    conf <- terra::rast(conf[[1]])
    return(sprc(classif,conf))
    }

Enfin la cinquième tâche du plan est celle qui lance le pipeline d’opération prenant une tuile en entrée :

    tar_terra_sprc(tiles,
        process_tile_pipeline(
        tiles_data=tile_pipeline_disagregated,
        year_folder=list_years,
        process_chain=PROCESS_CHAIN,
        slope_img=load_slope,
        height_img=load_height,
        start_step=start_step,
        end_step=end_step,
        visualize=FALSE),
        pattern=map(tile_pipeline_disagregated)),

Une fois ces différentes étapes mise ensemble, on obtient le [ pipeline final _targets.R suivant](https://gitlab.huma-num.fr/ar-magis-hpc/workshops/atelier-hpc-notebooks/atelier-hpc-sageo-2025/-/tree/master/postraitement-notebook-R

5.3 Parallélisation du code avec Python

Dask est le framework de référence en Python pour distribuer des calculs sur des environnements distribués, quelqu’il soit.

Dask en lui même est déjà complexe à aborder, mais c’est sans compter le grand nombre de librairie qui gravite, s’adapte, modifie, intègre ce dernier.

L’image générale qui résume Dask sur leur site web est celle-ci :

De la même façon que Target, Dask s’appuie sur la construction d’un graph. Seulement à la différence de Targets qui demande à ce que le workflow soit explicitement décrit, Dask propose de le construire à partir de votre code.

Dask s’appuie sur des bibliothèques déjà très établies dans le domaine du calcul scientifique avec Python, comme Numpy ou Panda.

De facto, les formats de Dask sont basés et/ou ressemble à ceux de Numpy/Panda.

Par exemple Dask et Panda possède la même forme d’API, et Dask vient seulement étendre les capacités de Panda. C’est le cas du Dask Dataframe qui est au final un ensemble de Panda Dataframe dont la coordination est faite par Dask.

Même chose pour le Dask Array qui coordonne un ensemble d’Array Numpy et possède une API en partie similaire.

Si le Dataframe est plutôt commun en géomatique pour tout ce qui est Analyse des Données, c’est un peu moins le cas des Array. C’est un format que nous allons manipuler plus loin dans notre chaine Python, en particulier via son extension XArray qui est parfaitement intégré et documenté avec Dask, à Panda, et à Numpy #1 #2

Une des différence avec les Array, c’est que XArray est multidimensionnel et possède des labels, et ne travaille donc pas que sur des chiffres.

Une des difficultés majeures pour nous géomaticien, c’est de comprendre que ces formats fonctionne pour la plupart en streaming sur la base d’un découpage en chunk. C’est à la fois leur avantage, car il n’est pas nécessaire de charger ni tout de suite (lazy evaluation), ni pendant l’opération de computation l’ensemble du fichier en mémoire. Mais c’est aussi leur défaut, car il faut savoir choisir les bons paramètres de _chunk_pour que cela fonctionne correctement #1 #2

L’autre perspective qui nous intéresse, c’est la présence d’une version géographique de Xarray, c’est à dire RioXarray. La librairie est encore jeune, mais c’est déjà un point de passage obligatoire vers Dask.

Un des avantages de Dask, c’est qu’il gère des formats spécifiques aux “Big Data”, qui dispose souvent de Bindings dans de nombreux langages (R, Python, etc.), comme Parquet (Tabular), HDF5 (Array) . Parquet et DuckDB par exemple sont de plus en plus utilisés en Analyse de Données sous R, pour aller au delà des capacités classiques des formats classiques, même optimisés (tibble, data.table). On retrouve des développements d’extensions, comme GeoParquet, pour la gestion des données spatiale avec Parquet.

5.4 Intégration Jupyter

Les deux étant dans l’écosystème Python, Dask est très bien intégré en tant que Widget à Jupyterlab.

Pour accéder au dashboard de dask (créé en local sur le noeud qui lance le job du cluster) il faut passer par le proxy : https://austral-hub.criann.fr/user//proxy//status

5.5 SLURM

La chaine tel que développé utilise directement des Workers locaux, mais il est aussi possible d’utiliser l’ordonnanceur SLURM qui est mis à disposition sur la plupart des Clusters. L’intérêt d’utiliser SLURM c’est qu’on peut se libérer de la machine sur lequel tourne Jupyter.

Par exemple si on a un Jupyter avec 30 coeurs et 64 Go de RAM, on va pouvoir accéder à des partitions plus importantes sur le CLUSTER. Au CRIANN par exemple il y a plusieurs types de partition qui sont fonction aussi des types de calcul à utiliser.

Pour passer de Workers qui tourne localement à des Workers qui tourne avec SLURM c’est assez simple :


from distributed import LocalCluster, Client
from dask_jobqueue import SLURMCluster

NB_WORKERS = 4
NB_CORES = 8
NB_THREADS = 4
MEMORY_LIMIT = "128GB"

cluster = SLURMCluster(cores=NB_CORES,
                       processes=NB_THREAD,
                       memory=MEMORY_LIMIT,
                       walltime="01:00:00",
                       queue="2tcourt",
                       job_extra_directives=['--reservation=sageo',],
                       log_directory="~/.temp")

cluster.adapt(minimum=NB_WORKERS, maximum=24)
    
client = Client(cluster)
client

Et on adapte un peu la méthode utilisé car on a fait remonté la définition de cluster :


def run_pipeline_with_slurm(
    data_folder: str,
    process_chain: List[Dict],
    start_step: Optional[str] = None,
    end_step: Optional[str] = None,
    visualize: bool = True,
    output_dir: Optional[str] = None
):
    """
    Run the processing pipeline for all years and tiles.
    
    Args:
        data_folder: Path to the data folder containing year directories
        process_chain: List of processing steps
        start_step: Name of the first step to run (or None for first step)
        end_step: Name of the last step to run (or None for last step)
        visualize: Whether to create visualizations
    """
    total_start = time()
    
    print(f"Processing steps: {start_step or 'beginning'} to {end_step or 'end'}")
    

        # Use OUTPUT_DIR if no custom output directory is provided
    if output_dir is None:
        output_dir = OUTPUT_DIR

    # Ensure output directory exists
    os.makedirs(output_dir, exist_ok=True)

    years = find_years(data_folder)
    for year in years:
        year_start = time()
        year_folder = os.path.join(data_folder, "years", year)
        year_output_dir = os.path.join(output_dir, year)

        
        # Find input tiles
        input_folder = os.path.join(year_folder, "inputs")
        tile_paths = find_classification_tiles(input_folder)   
        print(f"Processing {len(tile_paths)} tiles for year {year}")
        
        # Create a delayed task for each tile
        tasks = [
            delayed(process_tile_pipeline)(
                tile_path=tile_path,
                year_folder=year_folder,
                process_chain=process_chain,
                start_step=start_step,
                end_step=end_step,
                visualize=visualize
            )
            for tile_path in tile_paths
        ]
        
        # Execute all tasks
        print(f"Launching {len(tasks)} parallel tasks...")
        results = dask.compute(*tasks)
        
        # Generate and print report using the generate_report function
        # report_dir = os.path.join(year_folder, "outputs", "reports")
        report = generate_report(results, year, output_dir=year_output_dir)
        
        # Print a summary of the processing time
        print(f"Year {year} processing time: {time() - year_start:.2f} seconds")
        
    print(f"Total processing time: {time() - total_start:.2f} seconds")
run_pipeline_with_slurm(
     data_folder=DATA_FOLDER,
     process_chain=PROCESS_CHAIN,
     visualize=True
)    
# Ensure client and cluster is closed even if an error occurs
client.close()
print("Dask client closed")

cluster.close()
print("Dask cluster closed")

5.6 Parallélisation du code avec OpenMOLE

5.6.1 Introduction à OpenMOLE

Le logiciel libre et open source OpenMOLE, développé depuis 2008 à l’ISC-PIF, est une plateforme dédiée à l’exploration et la validation des modèles de simulation en particulier, mais consiste en un système de workflow scientifique assez générique pour embarquer tout type de code et le distribuer sur des infrastructures de calcul haute performance.

Il repose sur 3 axes fortement complémentaires: - l’embarquement de tout type de code, avec des tâches dédiées pour les languages les plus courant (Python, R, Java, scala, NetLogo) et la possibilité d’éxecuter des programmes natifs ou des containers docker; - l’accès transparent aux principaux environnements de calcul distribué (local, serveur, cluster, grille de calcul); - L’intégration de méthodes état-de-l’art en analyse de sensibilité, exploration et validation des modèles de simulation.

En pratique, la façon la plus ergonomique pour utiliser OpenMOLE est via une application web, avec une instance hébergée sur un serveur distant. La GUI permet de gérer les projets, éditer les scripts, lancer et suivre les calculs, explorer les résultats.

Chaque participant au TP dispose à présent d’un compte sur le service my.openmole, qui propose des instances d’OpenMOLE déjà déployées et hébergées aux académiques.

Connectez-vous à votre instance et explorez la GUI, les différentes façon de créer des projets, l’éditeur de scripts

Clonez le dépôt de l’atelier : “New Project”, “From git repository”, avec l’URL https://gitlab.huma-num.fr/ar-magis-hpc/workshops/atelier-hpc-notebooks/atelier-hpc-sageo-2025.git

5.6.2 Tutoriel pour le language de script (DSL) d’OpenMOLE

Le language de script (Domain Specific Language) d’OpenMOLE est basé sur le language scala.

Les prototypes (Val) permettent de stocker les variables et leur valeur lors d’expériences numériques : il s’agit des paramètres explorés du modèle.

   val i = Val[Double]
   val j = Val[Double]

Les tâches (Task) permettent d’embarquer et d’éxécuter les modèles, comme par exemple du code scala:

   val by2 = ScalaTask("val j = i * 2") set (
    inputs += i,
    outputs += (i, j),
    i := 10.0
  )

Les paramètres d’input et d’output sont essentiels dans un script OpenMOLE : ceux-ci permettent de définir les paramètres explorés (input) et les sorties récupérées (output).

Un script, pour être exécutable, doit toujours se conclure par une tâche qui sera alors éxécutée, et sur laquelle on peut brancher des sorties (ici affichage des résultats avec hook display) :

  by2 hook display

Execution du script 1_tutoriel/1_Hello.oms

Le language de script permet alors de composer les tâches pour créer des workflow complexes. Par exemple, pour enchaîner 2 tâches by2 et plus1, la syntaxe est:

  (by2 hook display) -- (plus1 hook display)

Execution du script 1_tutoriel/2_Sequence.oms

Des tâches spécifiques existent pour chaque méthode d’exploration, d’analyse de sensibilité, et de validation. Par exemple, la méthode la plus simple est un échantillonage explicite des points de paramètres sur lesquels le modèle sera éxecuté :

  val i = Val[Int]
  val j = Val[Double]

  DirectSampling(
    evaluation = myModel,
    sampling =
      (i in (0 to 10 by 2)) x
      (j in (0.0 to 5.0 by 0.5))
  ) hook(
    output = display,
    values = Seq(i)
  )

Execution du script 1_tutoriel/3_Explore.oms

5.6.3 Embarquement de code avec une PythonTask

Pour embarquer du code python, il existe une PythonTask qui permet de fournir directement le code python, et de brancher les prototypes OpenMOLE sur les variables python de manière simple (primitive mapped) :

   val pythonTask = PythonTask(workDirectory / "hello.py") set (
    inputs += arg.mapped,
    outputs += arg,
    outputs += output mapped "output.txt",
)

Execution du script 2_PythonTask/1_Example.oms

Il est possible de spécifier les bibliothèques python à installer pour le script. Ainsi, on peut implémenter la première étape du workflow de l’atelier, sous la forme d’une PythonTask, utilisant la bibliothèque geotiff.

Execution du script 2_PythonTask/2_Binarize_with_geotiff.oms

5.6.4 Embarquement du workflow

Le code python du workflow est intégré avec des PythonTask. Nous utilisons toujours GDAL, n’ayant pas besoin des bibliothèques spécialisées de parallélisation vues ce matin, puisque la parallélisation est assurée par OpenMOLE. L’installation du package python GDAL nécessite l’installation préalable de bibliothèques système. Cela est rendu possible par l’argument install de la tâche, qui permet l’exécution de commandes arbitraires en amont de l’installation des packages python.

    install = Seq("sudo apt-get update && sudo apt-get install --no-install-recommends -y gdal-bin libgdal-dev geos-bin libgeos-dev proj-bin libproj-dev")

Inspection du script 3_Workflow/1_Binarize.oms

Afin de pouvoir éxécuter le workflow, nous avons besoin d’accéder aux tuiles .tif depuis l’instance OpenMOLE. Il y a alors plusieurs options : - uploader les données sur l’instance - récupérer directement les données sur l’instance via “New project > From URL” - uploader les données via ssh directement sur le cluster de calcul (ou les récupérer dans un répertoire partagé sur ce cluster comme la première partie du TP, cependant l’accès à des volumes extérieurs via OpenMOLE peut être compliqué). - utiliser des Source directement dans le workflow openmole (encore limité à des données locales ou un URL http sans redirection et sans décompression)

Pour des raisons de simplicité, nous utiliserons la deuxième option, avec seulement 3 tuiles (le disque sur les instances est limité) et 2 années.

Dans le dossier “3_Workflow” sur votre instance, faire “New project > from URL” avec extraction de l’archive, avec l’URL suivante : https://nextcloud.openmole.org/s/8nMbaJfJTzMRadW/download

Une fois les données téléchargées sur l’instance, il est possible d’y accéder via inputFiles, comme présenté ci-dessus.

Les ressources des instances étant limitées, ne pas exécuter ce script et attendre l’étape de distribution sur le cluster de calcul.

On peut enfin utiliser le séquençage de tâches pour déployer le workflow. On commence par les deux premières tâches.

Inspection du script 3_Workflow/2_Workflow2Steps.oms

Et enfin le workflow complet.

Inspection du script 3_Workflow/3_Workflow.oms

Notes : - la stratégie de parallélisation ici est par tuile, par année, et par tâche : il n’y a pas besoin d’utiliser de bibliothèques de parallélisation avancées comme vues dans la première partie du TP - chaque étape du workflow a été décomposée sous forme d’une tâche individuelle, ce qui est le plus flexible pour la parallélisation mais pas forcément en termes de performance globale - des hook récupérant les fichiers intermédiaires peuvent être utilisés à chaque étape du workflow

5.6.5 Embarquement du workflow avec une ContainerTask

D’une part, le nombre de bibliothèques pour le workflow complet est assez conséquent et leur installation un peu longue; d’autre part, un bug (voir ici, fixé ce matin par Romain Reuillon, merci !) empêchait jusqu’à ce matin l’installation des bibliothèques systèmes nécessaire pour le binding python de GDAL. Dans de tels cas, il peut être ainsi plus aisé dans notre cas d’utiliser un container Docker déjà préparé, et d’utiliser une ContainerTask:

  val container = ContainerTask(
    "python:3.10-stretch",
    """python matrix.py data.csv ${i} out.csv""",
    install = Seq("pip install numpy")
  ) set (
    resources += workDirectory / "matrix.py",
    inputFiles += (dataFile, "data.csv"),
    outputFiles += ("out.csv", resultFile),
    (inputs, outputs) += (i, dataFileName)
  )

La tâche container, exécutée en pratique via singularity, offre moins de flexibilité sur le mapping des prototypes, mais permet d’embarquer un container docker arbitraire, et donc tout type d’application : il s’agit de l’option d’embarquement dans OpenMOLE la plus générique.

Voici la première étape du workflow embarquée avec une ContainerTask :

  val year = Val[Int]
  val tile = Val[String]

  val dataFolder = Val[File]
  val resultFolder = Val[File]

  ContainerTask(
    "justeraimbault/sageo-hpc-openmole:1.1",
    """python binarize.py data ${year} ${tile}"""
  ) set (
    resources += workDirectory / "functions.py",
    resources += workDirectory / "binarize.py",
    inputs += (year, tile),
    inputFiles += (dataFolder, "data"),
    outputFiles += ("output",resultFolder),
    year := 2017,
    tile := "T30TWP",
    dataFolder := workDirectory / "data"
  ) hook (workDirectory / "output_binarize")

Inspection du script 4_WorkflowContainer/1_Binarize.oms

On pourrait enfin utiliser le séquençage de tâches pour déployer le workflow complet, de la même façon que précedemment avec les PythonTask.

Si vous voulez exécuter ces scripts dans la dernière étape, plutôt que les scripts avec PythonTask, il faudra déplacer ou copier les données.

5.6.6 Distribution des calculs sur le cluster

Par default, OpenMOLE parallélise au niveau des tâches, avec les ressources disponibles.

Nous avons vu qu’en local, l’exécution était très longue (et parfois échoue à cause de la mémoire disponible limitée). Il est possible de changer d’environnement pour lancer sur le cluster pédagogique Zen (cluster Slurm).

Par default, OpenMOLE s’exécute sur un environnement local. Le nombre de coeurs peut être adapté avec :

  val env = LocalEnvironment(10)

Pour lancer les calculs sur le cluster Zen de Mesonet, il suffit d’invoquer un SlurmEnvironment, après avoir ajouté sa clef ssh privée permettant d’accéder au cluster (Authentications > New authentication > Cluster SSH private key).

val mesonet = SLURMEnvironment($USERID, "zen.univ-lille.fr", time = 20 minutes, memory = 4000 megabytes, modules = Seq("apptainer"))

Modifier les scripts précédents pour les exécuter sur le cluster

5.6.7 Parallélisation

Enfin, on peut paralléliser les années et les tiles, en utilisant une tâche d’exploration DirectSampling.

Modifier les scripts précédents pour traiter toutes les années et toutes les tiles