{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "[![AWS SDK for pandas](_static/logo.png \"AWS SDK for pandas\")](https://github.com/aws/aws-sdk-pandas)\n", "\n", "# 4 - Parquet Datasets\n", "\n", "awswrangler has 3 different write modes to store Parquet Datasets on Amazon S3.\n", "\n", "- **append** (Default)\n", "\n", " Only adds new files without any delete.\n", " \n", "- **overwrite**\n", "\n", " Deletes everything in the target directory and then add new files. If writing new files fails for any reason, old files are _not_ restored.\n", " \n", "- **overwrite_partitions** (Partition Upsert)\n", "\n", " Only deletes the paths of partitions that should be updated and then writes the new partitions files. It's like a \"partition Upsert\"." ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "from datetime import date\n", "\n", "import pandas as pd\n", "\n", "import awswrangler as wr" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Enter your bucket name:" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [ { "name": "stdin", "output_type": "stream", "text": [ " ············\n" ] } ], "source": [ "import getpass\n", "\n", "bucket = getpass.getpass()\n", "path = f\"s3://{bucket}/dataset/\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Creating the Dataset" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
idvaluedate
01foo2020-01-01
12boo2020-01-02
\n", "
" ], "text/plain": [ " id value date\n", "0 1 foo 2020-01-01\n", "1 2 boo 2020-01-02" ] }, "execution_count": 3, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df = pd.DataFrame({\"id\": [1, 2], \"value\": [\"foo\", \"boo\"], \"date\": [date(2020, 1, 1), date(2020, 1, 2)]})\n", "\n", "wr.s3.to_parquet(df=df, path=path, dataset=True, mode=\"overwrite\")\n", "\n", "wr.s3.read_parquet(path, dataset=True)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Appending" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
idvaluedate
03bar2020-01-03
11foo2020-01-01
22boo2020-01-02
\n", "
" ], "text/plain": [ " id value date\n", "0 3 bar 2020-01-03\n", "1 1 foo 2020-01-01\n", "2 2 boo 2020-01-02" ] }, "execution_count": 4, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df = pd.DataFrame({\"id\": [3], \"value\": [\"bar\"], \"date\": [date(2020, 1, 3)]})\n", "\n", "wr.s3.to_parquet(df=df, path=path, dataset=True, mode=\"append\")\n", "\n", "wr.s3.read_parquet(path, dataset=True)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Overwriting" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
idvaluedate
03bar2020-01-03
\n", "
" ], "text/plain": [ " id value date\n", "0 3 bar 2020-01-03" ] }, "execution_count": 5, "metadata": {}, "output_type": "execute_result" } ], "source": [ "wr.s3.to_parquet(df=df, path=path, dataset=True, mode=\"overwrite\")\n", "\n", "wr.s3.read_parquet(path, dataset=True)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Creating a **Partitioned** Dataset" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
idvaluedate
01foo2020-01-01
12boo2020-01-02
\n", "
" ], "text/plain": [ " id value date\n", "0 1 foo 2020-01-01\n", "1 2 boo 2020-01-02" ] }, "execution_count": 6, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df = pd.DataFrame({\"id\": [1, 2], \"value\": [\"foo\", \"boo\"], \"date\": [date(2020, 1, 1), date(2020, 1, 2)]})\n", "\n", "wr.s3.to_parquet(df=df, path=path, dataset=True, mode=\"overwrite\", partition_cols=[\"date\"])\n", "\n", "wr.s3.read_parquet(path, dataset=True)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Upserting partitions (overwrite_partitions)" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
idvaluedate
01foo2020-01-01
12xoo2020-01-02
23bar2020-01-03
\n", "
" ], "text/plain": [ " id value date\n", "0 1 foo 2020-01-01\n", "1 2 xoo 2020-01-02\n", "2 3 bar 2020-01-03" ] }, "execution_count": 7, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df = pd.DataFrame({\"id\": [2, 3], \"value\": [\"xoo\", \"bar\"], \"date\": [date(2020, 1, 2), date(2020, 1, 3)]})\n", "\n", "wr.s3.to_parquet(df=df, path=path, dataset=True, mode=\"overwrite_partitions\", partition_cols=[\"date\"])\n", "\n", "wr.s3.read_parquet(path, dataset=True)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## BONUS - Glue/Athena integration" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
idvaluedate
01foo2020-01-01
12boo2020-01-02
\n", "
" ], "text/plain": [ " id value date\n", "0 1 foo 2020-01-01\n", "1 2 boo 2020-01-02" ] }, "execution_count": 8, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df = pd.DataFrame({\"id\": [1, 2], \"value\": [\"foo\", \"boo\"], \"date\": [date(2020, 1, 1), date(2020, 1, 2)]})\n", "\n", "wr.s3.to_parquet(df=df, path=path, dataset=True, mode=\"overwrite\", database=\"aws_sdk_pandas\", table=\"my_table\")\n", "\n", "wr.athena.read_sql_query(\"SELECT * FROM my_table\", database=\"aws_sdk_pandas\")" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3.9.14", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.9.14" }, "pycharm": { "stem_cell": { "cell_type": "raw", "metadata": { "collapsed": false }, "source": [] } } }, "nbformat": 4, "nbformat_minor": 4 }