Le projet sur lequel j’ai travaillé porte sur la mise en place d’un pipeline efficace de préparation des données couvrant les différents aspects que sont l’ingestion du jeu de données public Online Retail (UCI) dans mon datawarehous, le nettoyage, l’enrichissement, le stockage sous Delta Lake avec l’ajout de contraintes logiques, desrequêtes analytiques et l’optimisation de lecture via le partitionnement des données.
Vous pouvez retrouver mon travail dans la section Projets du site et sur le dépôt GitHub.
Contexte
Dans cette note je montre comment j’ai utiliser le partitionnement avec PySpark, pour optimiser les performances des requêtes d’une plateforme d’e-commerce.
Le contexte est le suivant, une plateforme de retail en ligne a un fichier de données brutes contenant près de 540k observations (360k après nettoyage), dans lequel on trouve des valeurs manquantes, des annulations de commandes, des prix ou quantités aberrantes et un schéma peu adapté aux analyses fiables.
Le problème
Le besoin est de construire un pipeline reproductible qui :
- fiabilise les lignes utilisées pour l’évaluations du chiffre d’affaires et la segmentation client
- guarantisse la qualité des futurs chargements (contraintes sur les delta tables) pour pouvoir par la suite automatiser le processus
- permet des requêtes plus rapides (optimisation grâce au format parquet)
- supporte l’évolution des données (merge, historique Delta, comparaison de versions)
Durant notre analyse, nous allons nous concentrer sur le point 3, et voir ensemble pourquoi l’utilisation de Spark et du format parquet est un plus et en quoi le partitionning va permettre d’optimiser d’avantage les requêtes. Pour ce faire nous allons comparer les performances de notre base de données non-partitionnée à celle partitionnée en mesurant le temps d’xécution avec time.time().
Spark
1. Rappel
Les organisations font de plus en plus face simultanément à une volumétrie et à une variété importante dans les données. La difficulté n’est pas seulement de stocker, mais de traiter à grande échelle les données, en gardant une certaine tolérance aux pannes, tout en absorbant des formats hétérogènes et évolutifs (JSON, CSV, XML, texte brut).
Spark permet de standardiser l’analyse autour du DataFrame en un tableau distribué avec un schéma, qui combine l’ergonomie SQL et la flexibilité d’une API de transformation. Le schéma est ce qui permet à Spark d’optimiser les plans (column pruning, prédicate pushdown) et garantir la robustesse des transformations.
Car pour fonctionner, Spark SQL compile une requête en 2 plans, l’un logique, l’autre physique, en faisant cela Spark exploite fortement la mémoire pour accélérer les itérations. Ce mode de fonctionnement est ce qui fait que l’on qualifie Spark de lazy language, à savoir que les calculs ne sont déclenchés que lorsqu’on appelle une action (count, collect, reduce…). Spark va vous permettre de retarder l’exécution des opérations jusqu’au moment où cela devient nécessaire (appel à l’action), cette évaluation paresseuse permet d’optimiser le plan logique d’exécution avant de toucher au disque (plan physique).
2. Parquet
Le format Parquet est aujourd’hui devenu un standard dans les architectures de données distribuées, car il est conçu spécifiquement pour les traitements analytiques. Contrairement aux formats classiques comme CSV ou JSON, qui stockent les données ligne par ligne, Parquet adopte un stockage en colonnes. Cette différence change profondément la manière dont Spark lit les données. Dans un fichier CSV, chaque ligne contient l’ensemble des colonnes d’un enregistrement. Si une table possède cinquante colonnes mais qu’une requête n’en utilise que trois, Spark doit malgré tout parcourir l’intégralité du fichier et lire toutes les colonnes. Cela génère beaucoup d’entrées/sorties sur le disque (IO), ce qui ralentit fortement les traitements sur de gros volumes.
Avec Parquet, les colonnes sont stockées séparément. Spark peut alors lire uniquement les colonnes nécessaires à la requête. On parle de “projection pushdown”. Si une analyse ne nécessite que les colonnes date, montant et client_id, seules ces colonnes seront chargées en mémoire. Le volume de données lu diminue donc considérablement, ce qui réduit les IO, accélère les traitements et diminue la consommation mémoire.
Cette logique devient particulièrement importante dans les architectures analytiques où les tables contiennent parfois des centaines de colonnes mais où chaque requête n’en exploite qu’une petite partie.
Parquet apporte également un second avantage majeur, le “predicate pushdown”, c’est-à-dire la capacité à pousser les filtres directement au niveau du stockage. Les fichiers Parquet contiennent des métadonnées statistiques sur les données stockées dans chaque bloc, comme les valeurs minimales et maximales des colonnes. Spark peut utiliser ces informations pour éviter de lire des blocs entiers inutiles.
3. Le partitionnement
Le partitionnement consiste à découper physiquement les données en plusieurs répertoires selon la valeur d’une colonne. Dans Spark, une table partitionnée par annee et mois produira par exemple une arborescence de ce type :
/annee=2025/mois=01/
/annee=2025/mois=02/
/annee=2025/mois=03/Si une requête demande uniquement les données de février 2025, Spark ne lira que le dossier correspondant (partition pruning).
Le partitionnement doit être pensé selon les usages réels des données. Une erreur fréquente consiste à partitionner sur une colonne peu utilisée dans les filtres. Dans ce cas, Spark ne pourra presque jamais éliminer des partitions et devra malgré tout parcourir la majorité des données. Le partitionnement devient alors inutile.
Pire encore, un mauvais choix de partitionnement peut dégrader les performances. Si la colonne choisie possède énormément de valeurs distinctes commme par exemple un identifiant utilisateur ou une colonne de numéros unique de transaction, Spark va créer un très grand nombre de petits répertoires et donc une multitude de petits fichiers. Ce phénomène est connu sous le nom de “small files problem”.
Les petits fichiers sont problématiques dans les systèmes distribués comme HDFS (Hadhoop Distributed Files System) ou le stockage objet cloud. Chaque fichier possède des métadonnées, des coûts d’ouverture et des opérations réseau. Lorsque Spark doit gérer des millions de petits fichiers, le temps passé à planifier les tâches devient parfois plus important que le traitement lui-même. Le NameNode sur HDFS peut également devenir saturé par la quantité de métadonnées à maintenir. C’est l’une des plus grande limite de Spark.
Le bon partitionnement repose donc sur plusieurs critères :
- la colonne doit être fréquemment utilisée dans les filtres
- elle doit avoir une cardinalité raisonnable
- les partitions doivent rester relativement équilibrées
- il faut éviter de générer trop de petits fichiers
À présent voyons quelle méthodologie j’ai utlisé dans mon projet.
La méthodologie
1. Le choix du partitionnement
J’ai choisi un partitionnement par pays et continent, parce lorsqu’on travaille sur un dataset transactionnel de type retail, la première question à se poser avant de choisir une clé de partition est : quelle est la dimension dominante dans les requêtes analytiques ?
Ici, les analyses gravitent autour de la géographie — chiffre d’affaires par pays, comportement d’achat par région, comparaisons inter-continents. Partitionner par Country est donc le choix naturel.
Mais Country seul pose un problème car les données sont distribuées de manière asymétriques. Sur ce dataset, le Royaume-Uni représente à lui seul plus de 80% des transactions. Une partition Country=United_Kingdom serait donc massivement plus grande que les autres, c’est ce qu’on appelle du data skew (déséquilibre qui va ralentir une partition).
Ajouter Continent comme deuxième clé de partition répond à deux objectifs distincts. D’abord, ça crée une hiérarchie de filtrage, par exemple une requête qui cible WHERE Continent = 'Europa' peut éliminer d’emblée toutes les partitions hors Europe sans scanner une seule ligne. Ensuite, ça ouvre la porte à des analyses multi-niveaux, car on peux agréger au niveau pays dans une sous-requête, puis consolider au niveau continent et Spark peut dans ce cas tirer parti de la structure physique des partitions pour optimiser les deux niveaux d’agrégation en cascade.
2. L’utilisation du bucketing
Le bucketing répond à un problème différent du partitionnement. Le partitionnement organise les fichiers dans des répertoires physiques séparés selon les valeurs d’une colonne. Le bucketing, lui, distribue les lignes dans un nombre fixe de fichiers (les buckets) en appliquant une fonction de hachage sur la ou les colonnes choisies.
Par exemple en utilisant 8 buckets sur (Country, Shopsize), Spark calcule hash(Country, Shopsize) % 8 pour chaque ligne et l’envoie dans le fichier correspondant. Toutes les lignes ayant le même couple (Country, Shopsize) atterrissent donc dans le même bucket.
L’intérêt principal du bucketing se manifeste lors des jointures et des agrégations groupées. Si deux tables sont bucketisées sur les mêmes colonnes avec le même nombre de buckets, Spark peut joindre bucket_0 de la table A avec le bucket_0 de la table B directement, sans shuffle. Le shuffle (échange de données sur le réseau entre différents exécuteurs) étant l’une des opérations les plus coûteuses dans Spark. L’éliminer sur une jointure fréquente peut diviser le temps d’exécution par 2 à 5 selon la taille des données et le cluster.
J’ai choisi de procéder le bucket sur les colonnes Country et Shopsize parce que les requêtes de ce projet filtrent et agrègent régulièrement sur ces deux dimensions ensemble. Cela permet de voir le comportement d’achant par taille de commandes dans chaque pays (segments géographique de dépense).
J’ai également choisi d’utiliser 8 buckets car en règle générale, le nombre de buckets utilisées est un multiple du nombre de coeurs disponibles pour maximiser le parallélisme, et est d’un ordre de grandeur cohérent avec la tille des données pour éviter les buckets vides ou trop petites.
Le résultat
voici le schéma ASCII du plan d’exécution AQE:
┌───────────────────────────┐
│ SQL Query Input │
│ SELECT CustomerID │
│ FROM phase4 │
│ WHERE Country='France' │
└─────────────┬─────────────┘
│
▼
┌───────────────────────────┐
│ 1️⃣ Parsed Logical Plan │
│ - Projection CustomerID │
│ - Filter Country='France' │
│ - UnresolvedRelation phase4 │
└─────────────┬─────────────┘
│
▼
┌───────────────────────────┐
│ 2️⃣ Analyzed Logical Plan │
│ - Columns resolved │
│ CustomerID: string │
│ - Table schema known │
│ - Filter and Project clear │
└─────────────┬─────────────┘
│
▼
┌───────────────────────────┐
│ 3️⃣ Optimized Logical Plan │
│ - Null checks added │
│ (isnotnull(Country)) │
│ - Filter pushdown planned │
│ - Redundant ops removed │
└─────────────┬─────────────┘
│
▼
┌───────────────────────────┐
│ 4️⃣ Physical Plan │
│ - FileScan Parquet │
│ - Filter Pushdown (Country) │
│ - ColumnarToRow conversion │
│ - Projection CustomerID │
│ - AQE can adjust partition │
└───────────────────────────┘
Voici ma requête non-partitionnée:
query_non_part = """
SELECT
Country,
ROUND(SUM(OrderAmount),2) AS total_revenue
FROM phase4
GROUP BY Country
ORDER BY total_revenue DESC
"""Voici ma requête partitionnée:
query = """
CREATE TABLE IF NOT EXISTS sales_per_country_continent
USING csv
PARTITIONED BY (Country, Continent)
AS SELECT * FROM phase4
"""
spark.sql(query)Le temps d’exécution de la requête partitionnée est de 2.4516093730926514 seconds, le temps d’exécution de la requête partitionnée est de 0.9004688262939453 seconds. Soit un gain de près de 64% sur cette exécution en particulier. En répétant cette opération un grand volume de fois, le gain se stabilise autour de 60%.
Améliorations futures
Dans ce projet j’ai commencé par créer les tables partitionnées en CSV, en production l’idéal est de les créer au format delta ou parquet dès le départ afin de décupler les gains du partitionnement. Pareillement, la dimension temporelle n’est pas présente dans mon partitionnement, les données du dataset étant sur un an, une analyse pertinente est celle de l’évolution mensuelle du chiffre d’affaires qui mériterait une partition par mois.