Flow is a PHP-based, strongly typed ETL (Extract Transform Load), asynchronous data processing library with constant memory consumption.
Stand With Ukraine | Stand With Us |
On Feb. 24, 2022, Russia declared an unprovoked war on Ukraine and launched a full-scale invasion. Russia is currently bombing peaceful Ukrainian cities, including schools and hospitals and attacking civilians who are fleeing conflict zones.
On Oct. 7, 2023, the national holiday of Simchat Torah, Hamas terrorists initiated an attack on Israel in the early hours, targeting civilians. They unleashed violence that resulted in at least 1,400 casualties and abducted at least 200 individuals, not limited to Israelis.
<?php
declare(strict_types=1);
use Flow\ETL\DSL\Parquet;
use Flow\ETL\Filesystem\SaveMode;
use function Flow\ETL\DSL\{lit, ref, sum};
use Flow\ETL\DSL\To;
use Flow\ETL\Flow;
require __DIR__ . '/vendor/autoload.php';
(new Flow())
->read(Parquet::from(__FLOW_DATA__ . '/orders_flow.parquet'))
->select('created_at', 'total_price', 'discount')
->withEntry('created_at', ref('created_at')->toDate(\DateTime::RFC3339)->dateFormat('Y/m'))
->withEntry('revenue', ref('total_price')->minus(ref('discount')))
->select('created_at', 'revenue')
->groupBy('created_at')
->aggregate(sum(ref('revenue')))
->sortBy(ref('created_at')->desc())
->withEntry('daily_revenue', ref('revenue_sum')->round(lit(2))->numberFormat(lit(2)))
->drop('revenue_sum')
->write(To::output(truncate: false))
->mode(SaveMode::Overwrite)
->withEntry('created_at', ref('created_at')->toDate('Y/m'))
->write(Parquet::to(__FLOW_OUTPUT__ . '/daily_revenue.parquet'))
->run();
$ php daily_revenue.php
+------------+---------------+
| created_at | daily_revenue |
+------------+---------------+
| 2023/10 | 206,669.74 |
| 2023/09 | 227,647.47 |
| 2023/08 | 237,027.31 |
| 2023/07 | 240,111.05 |
| 2023/06 | 225,536.35 |
| 2023/05 | 234,624.74 |
| 2023/04 | 231,472.05 |
| 2023/03 | 231,697.36 |
| 2023/02 | 211,048.97 |
| 2023/01 | 225,539.81 |
+------------+---------------+
10 rows
- low and constant memory consumption
- reading from any data source
- writing to any data source
- rich collection of data transformation functions
- direct access to remote filesystems
- partitioning
- grouping & aggregating
- remote file processing
- joins
- sorting
- displaying datasets as ASCII table
- validation against the schema
- window functions
- caching
This package is a monorepo. Please check the below packages and select only those that you are going to use, this will reduce the number of unnecessary dependencies in your project (less maintenance).
- ETL
- Adapters
- Libraries
For example, if you want to work with JSON/CSV files here are the dependencies you will need to install:
composer require flow-php/etl:^0.1 flow-php/etl-adapter-csv:^0.1 flow-php/etl-adapter-json:^0.1
Since some of the Flow adapters require additional PHP extensions, we have prepared a Docker image with all the necessary dependencies.
$ docker pull ghcr.io/flow-php/flow:latest
$ docker run -v $(pwd):/flow-workspace --rm -it flow-php/flow:latest
Flow-PHP - Extract Transform Load - Data processing framework 0.4.0-325-g6c3e4404
Usage:
command [options] [arguments]
Options:
-h, --help Display help for the given command. When no command is given display help for the list command
-q, --quiet Do not output any message
-V, --version Display this application version
--ansi|--no-ansi Force (or disable --no-ansi) ANSI output
-n, --no-interaction Do not ask any interactive question
-v|vv|vvv, --verbose Increase the verbosity of messages: 1 for normal output, 2 for more verbose output and 3 for debug
Available commands:
completion Dump the shell completion script
help Display help for a command
list List commands
run Run ETL pipeline
parquet
parquet:read:data Read data from parquet file
parquet:read:metadata Read metadata from parquet file
If you would like to try Flow, fork this repository, navigate to it through command line interface and execute following command:
$ docker run -v $(pwd):/flow-workspace --rm -it flow-php/flow:latest run /flow-workspace/examples/topics/aggregations/daily_revenue.php
Flow CLI will grab the pipeline definition from examples/topics/aggregations/daily_revenue.php
file and execute it.
In order to understand how Flow works, please read documentation
- DataFrame - Lazy data processing frame.
- Rows - Immutable collection of
Row
objects. - Row - Immutable, strongly typed collection of
Entry
objects. - Entry - Immutable, strongly typed object representing a cell in a row.
- Extractor (Reader) - Memory safe, Data Source returning \Generator, yielding
Rows
to thePipeline
- Transformer - Data transformer receiving and returning
Rows
(in most cases transformer), one instance ofRows
at once. - Loader (Writer) - Memory safe representation of Data Sink, the responsibility of Loader is to write
Rows
into destination storage, one at time. - Pipeline - Interface representing ETL process, each received
Rows
instanced is passed through allPipes
, also responsible for error handling. - Pipe - Loader of Transformer instance existing in the
Pipes
collection.
Flow PHP is sponsored by:
- Blackfire - the best PHP profiling and monitoring tool!