Skip to content

OscartGiles/AirBend

Repository files navigation

AirBend 🌀

A pipeline for processing air pollution data. A short project to learn in Databend and dbt.

Airbend ingests data from the London Air Quality Network API into databend and then uses dbt to process the raw data for analysis.

Getting started

Dependencies:

Database - Databend

Databend is an open-source alternative to snowflake - because it can be self-hosted it doesn't require a cloud subscription. It can also supports minio as a storage backend.

Ensure you have docker compose installed and then start minio and databend locally:

docker compose up -d

Configure Minio

Go to http://localhost:9001 and sign in with:

username: ROOTUSER
password: CHANGEME123

then create a new bucket called databend. Databend will store tables and other data in this bucket.

Sign into bendql

bendsql -u databend -p databend

Check you can run a command against databend:

SELECT VERSION();

dbt

These instructions use uv for python virtual environement management. Install or adapt the commands for your own virtual environment tool.

Create a new virtual environment.

uv venv
source .venv/bin/activate

Build the databend-driver from source

We nee to install the bendsql python bindings for dbt to work with databend. On an Intel Mac I had to compile these from source.

Make sure you have the latest Rust toolchain installed.

Tnen install maturin:

uv pip install maturin

Clone bendsql:

git clone https://github.com/datafuselabs/bendsql /tmp/bendsql

Build from source:

(
  cd /tmp/bendsql/bindings/python
  maturin build
)

Note the location of the build. For example: /private/tmp/bendsql/target/wheels/databend_driver-0.20.1-cp37-abi3-macosx_10_12_x86_64.whl

and finally install it into your virtual environment:

uv pip install /private/tmp/bendsql/target/wheels/databend_driver-0.20.1-cp37-abi3-macosx_10_12_x86_64.whl

Install remaining dependencies

uv pip install setuptools dbt-core dbt-databend-cloud

Create a connection profile:

echo "airbend_pipeline:
  outputs:
    dev:
      host: localhost
      pass: databend
      port: 8000
      schema: airbend
      secure: false
      type: databend
      user: databend
  target: dev
" > ~/.dbt/profiles.yml
(
  cd airbend_pipeline
  uv run dbt debug
)

Everything should pass.

Airbend-ingest

Airbend-ingest is a CLI tool for extracting data from the LAQN API.

It consists of two components:

  • airbend-ingest: A CLI for making concurrent requests to the LAQN API and inserting into databend.
  • airbend_table: A library which maps Rust types to SQL for inserting into databend. It allows you to define databend tables using a derive macro.
  • airbend_table_derive: A custom derive macro used by databend_table.

For example, we can define a table and create it like so:

// Create a struct to represent the databend table and annotate it with databend types.
#[derive(AirbendTable)]
#[airbend_table(table_name = "raw_sensor_reading")]
pub struct FlatSensorReading {
    #[airbend_col(dtype = "TIMESTAMP")]
    pub scrape_time: jiff::Timestamp,
    #[airbend_col(dtype = "VARCHAR")]
    pub site_code: String,
    #[airbend_col(dtype = "VARCHAR")]
    pub measurement_date: String,
    #[airbend_col(dtype = "VARCHAR")]
    pub species_code: Option<String>,
    #[airbend_col(dtype = "VARCHAR")]
    pub value: Option<String>,
}

// Create the table
create::<FlatSensorReading>(&*conn).await?;

Install

Assuming you have a Rust toolchain installed:

cargo install --git https://github.com/OscartGiles/AirBend

Get some help:

airbend-ingest --help

and finally get data between a date range and insert into databend:

airbend-ingest --start-date 2024-09-01 --end-date 2024-10-02 --max-concurrent-connections 10

Run the dbt pipeline

Once you have ingested some data run the pipeline:

cd airbend_pipeline
uv run dbt run

About

Example data pipeline using Databend and dbt

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages