Skip to content

srujankothuri/TaxiPulse

Repository files navigation

πŸš• TaxiPulse β€” Real-Time NYC Taxi Analytics Engine

An end-to-end data engineering platform processing 9.5M+ NYC taxi records through batch (Airflow) and streaming (Kafka) pipelines with Medallion Architecture, automated data quality, star schema warehouse, Z-score anomaly detection, and interactive Streamlit dashboard.

Python Docker Airflow Kafka PostgreSQL MinIO CI


πŸ”— Live Demo

Resource Link
πŸ“Š Analytics Dashboard taxipulse-srujankothuri.streamlit.app
πŸ’» GitHub Repository github.com/srujankothuri/TaxiPulse

πŸ“Έ Screenshots

Landing Page

Landing Page

πŸ“Š Pipeline Overview

Pipeline Overview

πŸ—ΊοΈ Analytics Explorer

Analytics Explorer 1 Analytics Explorer 2

🚨 Anomaly Monitor

Anomaly Monitor 1 Anomaly Monitor 2

βœ… Data Quality Report

Data Quality Report 1 Data Quality Report 2

πŸ”„ Airflow DAGs

Batch Pipeline DAG Streaming Demo DAG

πŸͺ£ MinIO Data Lake

MinIO Bronze Layer

Slack Alerts

Slack


πŸ—οΈ Architecture

NYC TLC Data ──┬── Batch Path (Airflow) ──┐
               β”‚                          β”œβ”€β”€ MinIO (Bronze) ── Quality Checks
               └── Stream Path (Kafka) β”€β”€β”€β”˜         β”‚
                                                     β–Ό
                                            PostgreSQL (Silver β†’ Gold)
                                                     β”‚
                                          β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
                                          β–Ό          β–Ό          β–Ό
                                     Star Schema  Anomaly    Streamlit
                                     Warehouse    Detection  Dashboard
                                                  + Alerts

✨ Key Features

  • Dual Ingestion: Batch (Airflow) + Real-time streaming (Kafka) pipelines
  • Medallion Architecture: Bronze β†’ Silver β†’ Gold data layers
  • Automated Data Quality: 18 validation checks with quarantine system (98.6% pass rate)
  • Star Schema Warehouse: Fact table + 5 dimensions + pre-computed aggregations
  • Anomaly Detection: Z-score based fare/volume spike detection (3,623 anomalies found)
  • Slack Alerting: Real-time notifications for critical anomalies
  • Interactive Dashboard: 4-page Streamlit app with 15+ charts
  • Fully Containerized: 8 Docker services, one docker-compose up to run everything
  • Tested: 38 pytest tests with GitHub Actions CI/CD

πŸ“Š Key Metrics

Metric Value
Total records processed 9,554,778
Quality pass rate 98.6%
Clean Silver records 9,417,374
Quarantined records 137,403
Anomalies detected 3,623 (1,478 critical)
Star schema dimensions 5
Hourly aggregations 240,716
Docker services 8
Pytest tests 38

πŸ› οΈ Tech Stack

Component Technology
Orchestration Apache Airflow
Streaming Apache Kafka
Object Storage MinIO (S3-compatible)
Data Warehouse PostgreSQL
Data Quality Custom Python Engine (18 checks)
Anomaly Detection Python (scipy, numpy β€” Z-score)
Alerting Slack Webhooks
Containerization Docker + Docker Compose
Visualization Streamlit + Plotly
Testing pytest + GitHub Actions CI/CD
Language Python 3.11+

πŸ“‚ Project Structure

TaxiPulse/
β”œβ”€β”€ airflow/                  # Airflow DAGs (batch 7 tasks + streaming 2 tasks)
β”‚   └── dags/
β”œβ”€β”€ ingestion/                # Data ingestion (batch + Kafka streaming)
β”‚   β”œβ”€β”€ batch/
β”‚   └── streaming/
β”œβ”€β”€ transformations/          # Bronze β†’ Silver β†’ Gold transformations
β”‚   β”œβ”€β”€ bronze/
β”‚   β”œβ”€β”€ silver/
β”‚   └── gold/
β”œβ”€β”€ quality/                  # Data quality engine (18 expectations)
β”‚   └── expectations/
β”œβ”€β”€ anomaly_detection/        # Z-score anomaly detection + Slack alerting
β”œβ”€β”€ streamlit_app/            # 4-page monitoring dashboard
β”‚   β”œβ”€β”€ pages/
β”‚   └── data/                 # Exported CSVs for cloud deployment
β”œβ”€β”€ scripts/                  # Pipeline runner scripts
β”œβ”€β”€ tests/                    # 38 pytest tests (4 modules)
β”œβ”€β”€ docker/                   # Dockerfile for Airflow
β”œβ”€β”€ docs/                     # Documentation and screenshots
β”œβ”€β”€ .github/workflows/        # GitHub Actions CI/CD
β”œβ”€β”€ docker-compose.yml        # 8-service Docker infrastructure
β”œβ”€β”€ Makefile                  # Convenience commands
└── README.md

πŸš€ Quick Start

Prerequisites

  • Docker Desktop (4GB+ RAM)
  • Python 3.11+

Setup & Run

# 1. Clone
git clone https://github.com/srujankothuri/TaxiPulse.git
cd TaxiPulse

# 2. Create virtual environment
python -m venv venv
source venv/bin/activate  # Mac/Linux

# 3. Install dependencies
pip install -r requirements.txt

# 4. Configure
cp .env.example .env
# Edit .env: set MINIO_ENDPOINT=localhost:9000, POSTGRES_HOST=localhost, KAFKA_BOOTSTRAP_SERVERS=localhost:29092

# 5. Start infrastructure
docker-compose up -d

# 6. Run complete pipeline (~50 min)
make pipeline

# 7. Load zone names
python scripts/load_zone_names.py

# 8. Launch dashboard
python -m streamlit run streamlit_app/app.py

Access Points

Service URL Credentials
Streamlit Dashboard http://localhost:8501 β€”
Airflow UI http://localhost:8080 admin / admin
MinIO Console http://localhost:9001 taxipulse / taxipulse123
PostgreSQL localhost:5432 taxipulse / taxipulse123

Available Commands

make help          # Show all commands
make up            # Start Docker services
make down          # Stop Docker services
make pipeline      # Run full batch pipeline
make streaming     # Run Kafka streaming demo
make test          # Run 38 tests
make dashboard     # Launch Streamlit

πŸ“Š Data Model (Star Schema)

                    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
                    β”‚ dim_datetime  β”‚
                    β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜
                           β”‚
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ dim_pickup_loc   β”œβ”€β”€β”€fact_tripsβ”œβ”€β”€β”€ dim_dropoff_loc   β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”¬β”€β”€β”€β”€β”¬β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                         β”‚    β”‚
              β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    └──────────┐
              β–Ό                          β–Ό
     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”       β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
     β”‚ dim_payment    β”‚       β”‚ dim_rate_code     β”‚
     β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜       β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

πŸ”„ Pipeline Flow

Batch:     Download β†’ MinIO β†’ Bronze β†’ Validate β†’ Silver β†’ Gold β†’ Anomaly Detection
Streaming: Kafka Producer β†’ Kafka Topic β†’ Consumer β†’ Validate β†’ Silver

Both paths feed into the same Silver layer. Gold layer processes all data regardless of source.


πŸ“„ License

This project is licensed under the MIT License β€” see the LICENSE file for details.


πŸ‘€ Author

Venkata Srujan Kothuri

About

An end-to-end data engineering platform processing 9.5M+ NYC taxi records through batch (Airflow) and streaming (Kafka) pipelines with Medallion Architecture, automated data quality, star schema warehouse, Z-score anomaly detection, and interactive Streamlit dashboard.

Topics

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors