Benvenuto su #TechStation, l'hub di SDG Group dedicato alle ultime innovazioni in data e analytics! In questo articolo, dimostriamo come estendere l'uso di sistemi di templating come Jinja oltre i suoi confini tradizionali per automatizzare la creazione di pipeline dati. Attraverso un esempio pratico basato su Python e Delta Live Tables di Databricks, vedrai come un approccio guidato dai metadati possa standardizzare lo sviluppo, ridurre drasticamente il lavoro manuale e adattarsi a qualsiasi piattaforma dati.
Cercavi qualcos'altro? Guarda tutti gli altri contenuti qui.
Nel moderno data engineering, strumenti come dbt e Dataform hanno reso popolare l'uso dei templating engine per ottimizzare lo sviluppo delle pipeline dati.
In particolare, dbt si affida a Jinja: un templating engine basato su Python, noto per essere veloce, espressivo e flessibile.
Jinja permette agli sviluppatori di inserire variabili, strutture di controllo (come cicli e condizioni) e macro riutilizzabili direttamente all'interno di template testuali.
Queste funzionalità aiutano a semplificare la gestione del codice, a promuoverne il riutilizzo e a migliorare la leggibilità.
Considerando le capacità di Jinja, una domanda sorge spontanea: il suo utilizzo nel campo dell’ingegneria dei dati potrebbe essere esteso ad altri linguaggi e strumenti?
La risposta è sì, e questo articolo ne fornirà un semplice esempio.
Per questa dimostrazione, il contesto di riferimento sarà il framework Delta Live Tables (DLT) di Databricks e il linguaggio di programmazione sarà Python.
Passiamo ora alla definizione del problema. Popolare il livello bronze di una medallion architecture può essere un compito molto monotono per un data engineer.
Molto spesso, si tratta semplicemente di creare tabelle selezionando una serie di colonne che, a volte, devono essere rinominate o convertite ad un certo tipo di dato secondo degli specifici requisiti.
In una situazione come questa, Jinja può rivelarsi l'alleato perfetto. Poche righe di codice Python e un template possono automatizzare lo sviluppo in modo facile e veloce, facendo risparmiare al team tempo prezioso da dedicare ad attività più produttive.
In pratica, si immagini di avere dei requisiti che, per ogni entità da aggiungere al livello bronze a partire da file Parquet nel data lake, specificano una sorgente, un nome per l'entità e una lista di attributi, potenzialmente con tipi di dato e alias specifici.
Se non lo sono già, è molto semplice trasporre i requisiti in formato tabulare organizzandoli in due DataFrame Pandas: entities (con le colonne source_location e entity_name) e attributes (con le colonne entity_name, attribute_name, data_type e attribute_alias).
Ora che i requisiti sono salvati in un formato più comodamente accessibile, è il momento di costruire un template Jinja che contenga la struttura comune su cui si baseranno le entità DLT.
In questo caso, la struttura è molto diretta: una streaming table DLT creata da una selezione di colonne che, se necessario, vengono rinominate e convertite a specifici tipi di dato.
Questo file definisce la struttura del codice Python relativo alla streaming table, con segnaposto che verranno popolati dinamicamente.
import dlt |
A questo punto, basta solo combinare gli ingredienti. Un semplice script Python può essere utilizzato per iterare sui requisiti contenuti nei DataFrame entities e attributes e generare il codice finale utilizzando il template Jinja. Il codice ottenuto viene poi salvato in file Python individuali, uno per ogni entità.
Nota: in questo esempio, i requisiti delle tabelle sono stati inseriti direttamente nello script come dizionari Python e letti come DataFrame Pandas per semplicità, ma nella realtà potrebbero provenire da qualsiasi tipo di file di configurazione (CSV, JSON, tabelle di un database, ecc.).
| import pandas as pd from jinja2 import Template # Entities definition entities = pd.DataFrame([ {"entity_name": "customers", "source_location": "abfss://myContainer@myStorageAccount.dfs.core.windows.net/raw/customers/*/*.parquet"}, {"entity_name": "orders", "source_location": "abfss://myContainer@myStorageAccount.dfs.core.windows.net/raw/orders/*/*.parquet"}, {"entity_name": "products", "source_location": "abfss://myContainer@myStorageAccount.dfs.core.windows.net/raw/products/*/*.parquet"}, ]) # Attributes with optional cast and alias attributes = pd.DataFrame([ {"entity_name": "customers", "attribute_name": "customer_id", "data_type": "int", "attribute_alias": None}, {"entity_name": "customers", "attribute_name": "name", "data_type": "string", "attribute_alias": "full_name"}, {"entity_name": "customers", "attribute_name": "signup_ts", "data_type": "timestamp", "attribute_alias": "signup_date"}, {"entity_name": "orders", "attribute_name": "order_id", "data_type": "int", "attribute_alias": None}, {"entity_name": "orders", "attribute_name": "customer_id", "data_type": "int", "attribute_alias": None}, {"entity_name": "orders", "attribute_name": "order_total", "data_type": "double", "attribute_alias": "total_amount"}, {"entity_name": "orders", "attribute_name": "order_date", "data_type": "date", "attribute_alias": None}, {"entity_name": "products", "attribute_name": "product_id", "data_type": "int", "attribute_alias": None}, {"entity_name": "products", "attribute_name": "name", "data_type": "string", "attribute_alias": "product_name"}, {"entity_name": "products", "attribute_name": "price", "data_type": "double", "attribute_alias": None}, {"entity_name": "products", "attribute_name": "category", "data_type": "string", "attribute_alias": None}, ]) # Loading the Jinja template from the demo.jinja file with open("demo.jinja") as demo_template_file: demo_template = Template(demo_template_file.read()) # Rendering code for each entity and saving it to a Python file for _, entity in entities.iterrows(): entity_attrs = attributes[attributes.entity_name == entity["entity_name"]].to_dict(orient="records") entity_code = demo_template.render( entity_name = entity["entity_name"], source_location = entity["source_location"], columns = entity_attrs ) with open(f"{entity['entity_name']}.py", "w") as entity_file: entity_file.write(entity_code) |
Ecco come appare il codice finale generato per l'entità customers.
|
import dlt
from pyspark.sql.functions import * @dlt.table( comment = "Bronze customers table" ) def customers(): df = ( spark.readStream.format("cloudFiles") .option("cloudFiles.format", "parquet") .load("abfss://myContainer@myStorageAccount.dfs.core.windows.net/raw/customers/*/*.parquet") .select(
col("customer_id").cast("int"), col("name").cast("string").alias("full_name"), col("signup_ts").cast("timestamp").alias("signup_date") ) ) return df |
Questo approccio non solo riduce la ripetitività, ma garantisce anche coerenza nel codice e rende le modifiche molto più facili da gestire.
Se, ad esempio, un pattern specifico deve essere modificato in tutte le tabelle del livello bronze, è sufficiente aggiornare il template Jinja una sola volta, senza più dover intervenire manualmente su decine di script Python.
Questa configurazione modulare si integra perfettamente in un flusso di lavoro CI/CD. Lo script può rigenerare le definizioni DLT automaticamente in base a requisiti aggiornati, sbloccando un modello molto potente: lo sviluppo di pipeline dati dichiarativo e guidato dai metadati dove i data engineer o gli analisti definiscono il "cosa" (le entità e gli attributi), mentre il sistema si occupa del "come" (la logica Python ripetitiva).
Una considerazione ancora più importante è che questo concetto è indipendente dalla piattaforma su cui si lavora. Sebbene la dimostrazione utilizzi delle Delta Live Tables in Databricks, la stessa logica può essere adattata ad altri ambienti. I notebook di Microsoft Fabric o i jobs Glue su AWS, ad esempio, potrebbero facilmente essere sviluppati utilizzando questo processo, semplicemente modificando il template e la logica di generazione.
Per riassumere, i templating engine come Jinja non sono solo utili nello sviluppo web o in dbt: sono una soluzione versatile e completa per ridurre la ripetitività e aumentare la coerenza nella scrittura di codice.
Estendere l'uso di Jinja all'automazione dello sviluppo di pipeline dati può migliorare significativamente la produttività di un team.
Questa dimostrazione è molto semplice, ma le sue implicazioni sono potenzialmente enormi. Con poche righe di codice e dei template, è possibile produrre pipeline dati strutturate e scalabili, più facili da mantenere, più veloci da implementare e più allineate a una visione di infrastruttura dati dichiarativa.
Questo semplice concetto può essere esteso e perfezionato per supportare la validazione della qualità dei dati, le dichiarazioni di dipendenze tra oggetti e le trasformazioni in altri livelli dell’architettura (silver, gold), semplicemente espandendo il modello di metadati e arricchendo i template.
Vuoi accelerare lo sviluppo delle tue pipeline dati e ridurre il lavoro manuale? Contattaci per una consulenza personalizzata e scopri come i nostri framework di automazione su misura, basati su strumenti come Jinja e Python, possono standardizzare e scalare la tua data factory, liberando il tuo team per attività a più alto valore aggiunto.