Implementing Spatial ETL Pipelines with Apache Airflow

Modern geospatial workflows demand more than ad-hoc Jupyter notebooks and manual coordinate conversions. As organizations scale their location intelligence initiatives, the need for reliable, automated, and reproducible data workflows becomes critical. Implementing spatial ETL (Extract, Transform, Load) pipelines with Apache Airflow bridges the gap between raw geographic inputs and production-ready analytical datasets. By orchestrating Python-based GIS tasks through directed acyclic graphs (DAGs), teams can schedule, monitor, and scale spatial data transformations without sacrificing data integrity or computational efficiency.

At its core, a spatial ETL pipeline ingests heterogeneous geographic data—shapefiles, GeoJSON, PostGIS tables, or remote sensing rasters—cleans and enriches the geometries, and loads the results into a target database or analytics platform. Apache Airflow excels in this domain because it treats Python code as the primary orchestration language. This native compatibility allows GIS practitioners to leverage established libraries like GeoPandas, Shapely, and PyProj directly within Airflow tasks. The framework’s scheduler handles dependencies, retries, and logging, while custom Python operators execute the actual geospatial transformations.

Environment & Dependency Configuration

Before orchestrating workflows, you must establish a stable Python environment. Spatial Python packages often require compiled C libraries (e.g., GDAL, GEOS, PROJ), so using a virtual environment or containerized setup is strongly recommended. A production-grade requirements.txt typically includes:

apache-airflow>=2.7.0
geopandas>=0.14.0
shapely>=2.0.0
pyproj>=3.6.0
psycopg2-binary>=2.9.0
sqlalchemy>=2.0.0
fiona>=1.9.0

Once dependencies are installed, Airflow requires a configured AIRFLOW_HOME directory with a dags/ folder. All pipeline definitions must reside here to be discovered by the scheduler.

Architecting the DAG

In Airflow, a DAG defines the workflow structure, scheduling, and task dependencies. For spatial ETL, the pipeline typically follows three distinct phases:

  1. Extract: Pull raw spatial or tabular data from APIs, cloud storage, or legacy file systems.
  2. Transform: Convert coordinates, validate geometries, enrich attributes, and apply spatial operations.
  3. Load: Write processed features to a spatial database, data warehouse, or cloud storage bucket.
flowchart LR
    A[Extract<br/>APIs · cloud storage · legacy files] --> B[Transform<br/>reproject · validate · enrich]
    B --> C[Load<br/>PostGIS · warehouse · object store]

Because geospatial DataFrames can easily exceed Airflow’s XCom payload limits (1 MB by default), production pipelines should pass data between tasks using intermediate files (e.g., Parquet or GeoParquet) rather than in-memory serialization.

Extract: Ingesting Heterogeneous Geospatial Data

The extraction phase focuses on reliable data acquisition. Whether reading CSVs with latitude/longitude columns, parsing GeoJSON from a REST API, or querying a legacy PostGIS table, the goal is to standardize inputs before transformation. Geocoding and reverse geocoding often occur at this stage when raw addresses or place names must be converted into machine-readable coordinates. Using libraries like geopy or geopandas.tools.geocode, you can batch-resolve locations and cache results to avoid hitting API rate limits.

Transform: Core Python GIS Operations

The transformation phase is where Spatial Data Processing & Analysis becomes actionable. Raw coordinates are rarely ready for analytical consumption. They require coordinate reference system (CRS) alignment, geometry validation, and spatial enrichment.

A robust transform task typically includes the following stages, applied in sequence within the transform task:

flowchart LR
    A["Raw coordinates"] --> B["CRS standardization<br/>to projected CRS"]
    B --> C["Topology validation<br/>make_valid()"]
    C --> D["Spatial joins<br/>& overlays enrich"]
    D --> E["Spatial indexing<br/>gdf.sindex"]
    E --> F["Analysis-ready<br/>GeoDataFrame"]

In detail, a robust transform task typically includes:

  • CRS Standardization: Converting all layers to a common projected CRS (e.g., EPSG:3857 or a local UTM zone) to ensure accurate distance and area calculations.
  • Topology Validation and Cleaning: Real-world spatial data frequently contains self-intersections, duplicate nodes, or unclosed polygons. Using shapely.validation.make_valid() and GeoPandas filtering, you can repair invalid geometries before they cause downstream failures.
  • Spatial Joins and Overlays: Combining datasets based on geometric relationships (intersects, contains, within) is foundational for spatial enrichment. Understanding how Spatial Joins and Overlays function allows you to merge demographic, environmental, or infrastructure attributes onto your primary feature set efficiently.
  • Spatial Indexing for Performance: As dataset sizes grow, brute-force spatial comparisons become computationally prohibitive. GeoPandas automatically leverages R-tree spatial indexes during sjoin() operations, but explicitly calling gdf.sindex or using pygeos/shapely 2.0 vectorized operations can drastically reduce execution time.
  • Downstream Readiness: Once geometries are clean and enriched, the dataset becomes a reliable input for advanced workflows like Network Analysis with Python, where routing, service area generation, or accessibility metrics depend on topologically sound, correctly projected features.

Load: Publishing to Production Databases

The final phase writes the transformed GeoDataFrame to a target system. PostGIS remains the industry standard for spatial data storage. Using geopandas.to_postgis() alongside SQLAlchemy, you can stream data efficiently, handle schema creation, and automatically generate spatial indexes on the geometry column. Chunking large datasets and wrapping the operation in a transaction ensures atomicity and prevents partial writes.

Production-Ready DAG Implementation

The following DAG demonstrates a complete, production-ready spatial ETL pipeline. It avoids XCom for large payloads, implements robust error handling, and follows Airflow best practices.

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
import geopandas as gpd
import pandas as pd
import tempfile
import os
from shapely.validation import make_valid
from sqlalchemy import create_engine, text
import logging

logger = logging.getLogger(__name__)

# Default DAG arguments
default_args = {
    "owner": "gis_engineering",
    "retries": 2,
    "retry_delay": timedelta(minutes=5),
    "start_date": datetime(2024, 1, 1),
    "email_on_failure": True,
    "email_on_retry": False,
}

dag = DAG(
    dag_id="spatial_etl_pipeline",
    default_args=default_args,
    schedule="@daily",
    catchup=False,
    tags=["gis", "etl", "postgis"],
)

# Shared temporary directory for inter-task data passing
TEMP_DIR = tempfile.gettempdir()
RAW_CSV_PATH = "/data/raw/addresses.csv"
PARQUET_PATH = os.path.join(TEMP_DIR, "spatial_etl_raw.parquet")
GEOPARQUET_PATH = os.path.join(TEMP_DIR, "spatial_etl_transformed.geoparquet")

def extract_addresses(**kwargs):
    """Ingest raw tabular data and save as Parquet for downstream tasks."""
    logger.info("Extracting address data from CSV...")
    try:
        df = pd.read_csv(RAW_CSV_PATH)
        # Basic validation
        required_cols = {"address", "latitude", "longitude"}
        if not required_cols.issubset(df.columns):
            raise ValueError(f"Missing required columns: {required_cols - set(df.columns)}")

        df.to_parquet(PARQUET_PATH, index=False)
        logger.info(f"Extracted {len(df)} records to {PARQUET_PATH}")
    except Exception as e:
        logger.error(f"Extraction failed: {e}")
        raise

def transform_geometries(**kwargs):
    """Convert to spatial, validate topology, project CRS, and save as GeoParquet."""
    logger.info("Transforming tabular data to spatial format...")
    try:
        df = pd.read_parquet(PARQUET_PATH)

        # 1. Create GeoDataFrame
        gdf = gpd.GeoDataFrame(
            df,
            geometry=gpd.points_from_xy(df.longitude, df.latitude),
            crs="EPSG:4326",
        )

        # 2. Topology validation & cleaning
        gdf["geometry"] = gdf["geometry"].apply(lambda geom: make_valid(geom) if geom else None)
        gdf = gdf.dropna(subset=["geometry"])

        # 3. Project to metric CRS for accurate spatial operations
        gdf = gdf.to_crs("EPSG:3857")

        # 4. Spatial indexing optimization (GeoPandas auto-indexes on sjoin, 
        # but explicit indexing helps with large datasets)
        gdf.sindex

        # Save as GeoParquet (preserves CRS and geometry type)
        gdf.to_parquet(GEOPARQUET_PATH, index=False)
        logger.info(f"Transformed {len(gdf)} valid geometries to {GEOPARQUET_PATH}")
    except Exception as e:
        logger.error(f"Transformation failed: {e}")
        raise

def load_to_postgis(**kwargs):
    """Stream GeoDataFrame to PostGIS with spatial index creation."""
    logger.info("Loading transformed data to PostGIS...")
    db_url = os.getenv("POSTGIS_CONNECTION_STRING", "postgresql+psycopg2://user:pass@localhost:5432/gis_db")

    try:
        engine = create_engine(db_url)
        gdf = gpd.read_parquet(GEOPARQUET_PATH)

        # Load to database
        gdf.to_postgis(
            name="processed_addresses",
            con=engine,
            if_exists="replace",
            index=False,
            schema="public",
            dtype={"geometry": "GEOMETRY(Point, 3857)"}
        )

        # Create spatial index for query optimization
        with engine.connect() as conn:
            conn.execute(text(
                "CREATE INDEX IF NOT EXISTS idx_processed_addresses_geom "
                "ON public.processed_addresses USING GIST (geometry);"
            ))
            conn.commit()

        logger.info("Successfully loaded and indexed data in PostGIS.")
    except Exception as e:
        logger.error(f"Database load failed: {e}")
        raise

# Define task dependencies
extract_task = PythonOperator(
    task_id="extract_addresses",
    python_callable=extract_addresses,
    dag=dag,
)

transform_task = PythonOperator(
    task_id="transform_geometries",
    python_callable=transform_geometries,
    dag=dag,
)

load_task = PythonOperator(
    task_id="load_to_postgis",
    python_callable=load_to_postgis,
    dag=dag,
)

# DAG execution order
extract_task >> transform_task >> load_task

Key Production Considerations

  • Stateless Task Design: Each function reads from and writes to explicit file paths. This makes tasks independently testable and resilient to scheduler restarts.
  • CRS Management: Always define source CRS explicitly. Relying on implicit CRS assumptions leads to silent misalignments during spatial joins.
  • Database Credentials: Never hardcode connection strings. Use Airflow Connections (airflow connections add) and retrieve them via BaseHook.get_connection() or environment variables.
  • Monitoring & Alerting: Leverage Airflow’s built-in logging, Slack/Email alerts, and custom metrics to track pipeline latency, failure rates, and row counts.

Conclusion

Implementing spatial ETL pipelines with Apache Airflow transforms fragile, manual GIS workflows into scalable, auditable data products. By combining Airflow’s orchestration capabilities with Python’s mature geospatial ecosystem, teams can automate coordinate transformations, enforce topology rules, optimize spatial indexing, and reliably publish datasets to enterprise databases. As your spatial infrastructure matures, this foundation seamlessly supports advanced analytical workflows, ensuring that location intelligence remains accurate, reproducible, and ready for production consumption.