Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Database connection serializer in Prefect #16557

Closed
amanchaudhary-95 opened this issue Dec 31, 2024 · 4 comments
Closed

Database connection serializer in Prefect #16557

amanchaudhary-95 opened this issue Dec 31, 2024 · 4 comments
Labels
bug Something isn't working

Comments

@amanchaudhary-95
Copy link

Bug summary

I'm trying to serialize a database connection. I want to make the connection persistent so that I don't need to connect it every task run. The issue is that the prefect cache is not able to serialize the connection. The code is below:

from prefect import flow, task
from prefect.logging import get_run_logger
from datetime import datetime, timedelta
from prefect.cache_policies import TASK_SOURCE

def get_run_name():
    return datetime.now().strftime('%d-%m-%Y %H:%M:%S')

@task(cache_policy=TASK_SOURCE)
def get_pi():
    import PIconnect as PI
    PI.PIConfig.DEFAULT_TIMEZONE = 'Asia/Kolkata'
    DA = PI.PIServer()
    return DA

@task(name='OSIPI_'+get_run_name(), description='Get data Task Run')
def get_data():
    DA = get_pi()
    start_dt = datetime.now() - timedelta(hours=10)
    end_dt = datetime.now()
    frequency = '1h'
    point = DA.search('2000_PM6_valmet_lab_data_roll_no')[0]
    data =  point.interpolated_values(start_dt, end_dt, frequency) if end_dt is not None else point.interpolated_value(start_dt)
    return data

@task(name='process_'+get_run_name(), description='Get data Task Run')
def process_data(data):
    print(data.dtypes)

@flow(name='OSIPI get data')
def osipi_get_data():
    logger = get_run_logger()
    logger.info('OSIPI get data Process Started')
    df = get_data()
    logger.info('Data Fetched')
    process_data(df)
    logger.info('Process Completed')


if __name__ == '__main__':
    osipi_get_data.serve(name="OSIPI Test",
                        description="Test OSIPI",
                        tags=["onboarding"]
                      )

The code is working fine, and it can get the data from database but I'm getting below error in prefect consol:

12:50:56.609 | INFO    | prefect.flow_runs.runner - Runner 'OSIPI Test' submitting flow run '4f2233d2-b843-482d-b73a-ee4ad6e117db'
12:50:56.734 | INFO    | prefect.flow_runs.runner - Opening process...
12:50:56.769 | INFO    | prefect.flow_runs.runner - Completed submission of flow run '4f2233d2-b843-482d-b73a-ee4ad6e117db'
12:51:00.905 | INFO    | Flow run 'mini-boobook' - Downloading flow code from storage at '.'
12:51:01.756 | INFO    | Flow run 'mini-boobook' - OSIPI get data Process Started
12:51:10.462 | INFO    | Task run 'get_pi-fa9' - Finished in state Completed()
12:51:11.695 | WARNING | Task run 'get_pi-fa9' - Encountered an error while serializing result for transaction '50d02614c60b588431cacf863d7b7b88': Failed to serialize object of type 'PIServer' with serializer 'pickle'. You can try a different serializer (e.g. result_serializer="json") or disabling persistence (persist_result=False) for this flow or task. Code execution will continue, but the transaction will not be committed.
12:51:11.700 | INFO    | Task run 'OSIPI_31-12-2024 12:51:00-ec2' - Finished in state Completed()
12:51:11.702 | INFO    | Flow run 'mini-boobook' - Data Fetched
12:51:12.281 | ERROR   | Task run 'process_31-12-2024 12:51:00-5fa' - Error encountered when computing cache key - result will not be persisted.
Traceback (most recent call last):
  File "D:\.virtualenvs\Airflow\Lib\site-packages\prefect\cache_policies.py", line 297, in compute_key
    return hash_objects(hashed_inputs, raise_on_failure=True)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "D:\.virtualenvs\Airflow\Lib\site-packages\prefect\utilities\hashing.py", line 89, in hash_objects
    raise HashError(msg)
prefect.exceptions.HashError: Unable to create hash - objects could not be serialized.
  JSON error: Unable to serialize unknown type: <class 'PIconnect.PIData.PISeries'>
  Pickle error: cannot pickle 'AFEnumerationValue' object

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "D:\.virtualenvs\Airflow\Lib\site-packages\prefect\task_engine.py", line 158, in compute_transaction_key
    key = self.task.cache_policy.compute_key(
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "D:\.virtualenvs\Airflow\Lib\site-packages\prefect\cache_policies.py", line 169, in compute_key
    policy_key = policy.compute_key(
                 ^^^^^^^^^^^^^^^^^^^
  File "D:\.virtualenvs\Airflow\Lib\site-packages\prefect\cache_policies.py", line 169, in compute_key
    policy_key = policy.compute_key(
                 ^^^^^^^^^^^^^^^^^^^
  File "D:\.virtualenvs\Airflow\Lib\site-packages\prefect\cache_policies.py", line 307, in compute_key
    raise ValueError(msg) from exc
ValueError: Unable to create hash - objects could not be serialized.
  JSON error: Unable to serialize unknown type: <class 'PIconnect.PIData.PISeries'>
  Pickle error: cannot pickle 'AFEnumerationValue' object

This often occurs when task inputs contain objects that cannot be cached like locks, file handles, or other system resources.    

To resolve this, you can:
  1. Exclude these arguments by defining a custom `cache_key_fn`
  2. Disable caching by passing `cache_policy=NONE`

12:51:12.294 | INFO    | Task run 'process_31-12-2024 12:51:00-5fa' - Finished in state Completed()
12:51:12.295 | INFO    | Flow run 'mini-boobook' - Process Completed
12:51:12.422 | INFO    | Flow run 'mini-boobook' - Finished in state Completed()
OSIsoft(r) AF SDK Version: 2.10.9.593
object
12:51:17.817 | INFO    | prefect.flow_runs.runner - Process for flow run 'mini-boobook' exited cleanly.

Is there a way to serialize the database connection in Prefect or is there a way to make a database connection persistent so that I don't have to re-connect with it at each task run?

Version info

Prefect Version:

Python Version : 3.10.8
Prefect : 3.1.8
OS: Windows 11 Pro
Pydantic Version: 2.10.4
Database name : OSIPI historian
Python package to connect database : PIconnect
PIconnect Version : 0.12.1

Additional context

I tried caching the same database connection in Streamlit using the st.cache_resource and the connection was persistent in it across all the sessions and runs. I'm not sure if similar caching is possible in prefect or not.

@amanchaudhary-95 amanchaudhary-95 added the bug Something isn't working label Dec 31, 2024
@zzstoatzz
Copy link
Collaborator

zzstoatzz commented Dec 31, 2024

hi @amanchaudhary-95 - did you try either of the suggestions given by the error?

This often occurs when task inputs contain objects that cannot be cached like locks, file handles, or other system resources.    

To resolve this, you can:
  1. Exclude these arguments by defining a custom `cache_key_fn`
  2. Disable caching by passing `cache_policy=NONE`

for example, option 1 seems likely to work for your case

docs on this

@amanchaudhary-95
Copy link
Author

Hi @zzstoatzz,

Yes, I had tried both the options. With both the options, it will not cache the connection.

@zzstoatzz
Copy link
Collaborator

zzstoatzz commented Jan 1, 2025

if you need to cache the connection object (which is not a common pattern that I've seen personally) then you might need to write your own serializer. however if you're just trying to access the same connection object in different tasks, you could make it some kind of global and not pass it as a task argument

note that the latter option will likely not work if you use my_task.submit() since most connection objects cannot be used in a different thread than they were created in

@zzstoatzz
Copy link
Collaborator

hi @amanchaudhary-95 - i am going to close this issue, as I think there exists an escape hatch today via a custom cache key policy. Feel free to comment here if you think this should be reopened

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants