Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[sk] Improved PostgreSQL Data Loader #651

Merged
merged 17 commits into from
Jul 14, 2022
Prev Previous commit
Next Next commit
[sk] Some more bug fixes
  • Loading branch information
skunichetty committed Jul 14, 2022
commit 28142209a6379f149bc897ec8482a00b24c1a11f
13 changes: 6 additions & 7 deletions mage_ai/io/export_utils.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
from enum import Enum
from pandas.api.types import infer_dtype
from pandas import DataFrame, Series
from mage_ai.data_cleaner.shared.utils import clean_name
from typing import Callable, Dict, Mapping, Tuple
import numpy as np
from pandas import DataFrame, Series
from pandas.api.types import infer_dtype
from typing import Callable, Dict, Mapping

"""
Utilities for exporting Python data frames to external databases.
Expand Down Expand Up @@ -34,13 +33,13 @@ class PandasTypes(str, Enum):
INTEGER = 'integer'
FLOATING = 'floating'
MIXED = 'mixed'
MIXED_INTEGER = 'mixed=integer'
MIXED_INTEGER = 'mixed-integer'
MIXED_INTEGER_FLOAT = 'mixed-integer-float'
PERIOD = 'period'
STRING = 'string'
TIME = 'time'
TIMEDELTA64 = 'timedelta64'
TIMEDELTA = 'timedelta'
TIMEDELTA64 = 'timedelta64'
UNKNOWN_ARRAY = 'unknown-array'


Expand Down Expand Up @@ -98,5 +97,5 @@ def gen_table_creation_query(
"""
query = []
for cname in dtypes:
query.append(f'{clean_name(cname)} {dtypes[cname]}')
query.append(f'"{clean_name(cname)}" {dtypes[cname]}')
return f'CREATE TABLE {table_name} (' + ','.join(query) + ');'
9 changes: 7 additions & 2 deletions mage_ai/io/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,10 @@ def export(

df.to_csv(buffer, index=False, header=False)
buffer.seek(0)
cur.copy_from(buffer, table=table_name, sep=',', null='', **kwargs)
cur.copy_expert(
f'COPY {table_name} FROM STDIN (FORMAT csv, DELIMITER \',\', NULL \'\');',
buffer,
)
self.conn.commit()

def __table_exists(self, table_name: str) -> bool:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here. you need both schema_name and table_name to identify a table.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added schema name to table existence check.

Expand Down Expand Up @@ -177,7 +180,9 @@ def get_type(self, column: Series, dtype: str) -> str:
PandasTypes.UNKNOWN_ARRAY,
PandasTypes.COMPLEX,
):
raise BadConversionError(f'Cannot convert {dtype} to a PostgreSQL datatype.')
raise BadConversionError(
f'Cannot convert column \'{column.name}\' with data type \'{dtype}\' to a PostgreSQL datatype.'
)
elif dtype in (PandasTypes.DATETIME, PandasTypes.DATETIME64):
try:
if column.dt.tz:
Expand Down