A pipeline for processing air pollution data. A short project to learn in Databend and dbt.
Airbend ingests data from the London Air Quality Network API into databend and then uses dbt to process the raw data for analysis.
Databend is an open-source alternative to snowflake - because it can be self-hosted it doesn't require a cloud subscription. It can also supports minio as a storage backend.
Ensure you have docker compose installed and then start minio and databend locally:
docker compose up -d
Go to http://localhost:9001 and sign in with:
username: ROOTUSER
password: CHANGEME123
then create a new bucket called databend
. Databend will store tables and other data in this bucket.
bendsql -u databend -p databend
Check you can run a command against databend:
SELECT VERSION();
These instructions use uv for python virtual environement management. Install or adapt the commands for your own virtual environment tool.
Create a new virtual environment.
uv venv
source .venv/bin/activate
We nee to install the bendsql python bindings for dbt to work with databend. On an Intel Mac I had to compile these from source.
Make sure you have the latest Rust toolchain installed.
Tnen install maturin:
uv pip install maturin
Clone bendsql:
git clone https://github.com/datafuselabs/bendsql /tmp/bendsql
Build from source:
(
cd /tmp/bendsql/bindings/python
maturin build
)
Note the location of the build. For example: /private/tmp/bendsql/target/wheels/databend_driver-0.20.1-cp37-abi3-macosx_10_12_x86_64.whl
and finally install it into your virtual environment:
uv pip install /private/tmp/bendsql/target/wheels/databend_driver-0.20.1-cp37-abi3-macosx_10_12_x86_64.whl
uv pip install setuptools dbt-core dbt-databend-cloud
Create a connection profile:
echo "airbend_pipeline:
outputs:
dev:
host: localhost
pass: databend
port: 8000
schema: airbend
secure: false
type: databend
user: databend
target: dev
" > ~/.dbt/profiles.yml
(
cd airbend_pipeline
uv run dbt debug
)
Everything should pass.
Airbend-ingest is a CLI tool for extracting data from the LAQN API.
It consists of two components:
- airbend-ingest: A CLI for making concurrent requests to the LAQN API and inserting into databend.
- airbend_table: A library which maps Rust types to SQL for inserting into databend. It allows you to define databend tables using a derive macro.
- airbend_table_derive: A custom derive macro used by databend_table.
For example, we can define a table and create it like so:
// Create a struct to represent the databend table and annotate it with databend types.
#[derive(AirbendTable)]
#[airbend_table(table_name = "raw_sensor_reading")]
pub struct FlatSensorReading {
#[airbend_col(dtype = "TIMESTAMP")]
pub scrape_time: jiff::Timestamp,
#[airbend_col(dtype = "VARCHAR")]
pub site_code: String,
#[airbend_col(dtype = "VARCHAR")]
pub measurement_date: String,
#[airbend_col(dtype = "VARCHAR")]
pub species_code: Option<String>,
#[airbend_col(dtype = "VARCHAR")]
pub value: Option<String>,
}
// Create the table
create::<FlatSensorReading>(&*conn).await?;
Assuming you have a Rust toolchain installed:
cargo install --git https://github.com/OscartGiles/AirBend
Get some help:
airbend-ingest --help
and finally get data between a date range and insert into databend:
airbend-ingest --start-date 2024-09-01 --end-date 2024-10-02 --max-concurrent-connections 10
Once you have ingested some data run the pipeline:
cd airbend_pipeline
uv run dbt run