Skip to content

Commit

Permalink
feat: adding Feature Store: Streaming ingestion to GA
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 501383040
  • Loading branch information
vertex-sdk-bot authored and copybara-github committed Jan 11, 2023
1 parent deba06b commit 6bc4c84
Show file tree
Hide file tree
Showing 7 changed files with 417 additions and 20 deletions.
245 changes: 245 additions & 0 deletions google/cloud/aiplatform/featurestore/_entity_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
featurestore_online_service as gca_featurestore_online_service,
io as gca_io,
)
from google.cloud.aiplatform.compat.types import types as gca_types
from google.cloud.aiplatform import featurestore
from google.cloud.aiplatform import initializer
from google.cloud.aiplatform import utils
Expand Down Expand Up @@ -1539,3 +1540,247 @@ def _construct_dataframe(
data.append(entity_data)

return pd.DataFrame(data=data, columns=["entity_id"] + feature_ids)

def write_feature_values(
self,
instances: Union[
List[gca_featurestore_online_service.WriteFeatureValuesPayload],
Dict[
str,
Dict[
str,
Union[
int,
str,
float,
bool,
bytes,
List[int],
List[str],
List[float],
List[bool],
],
],
],
"pd.DataFrame", # type: ignore # noqa: F821 - skip check for undefined name 'pd'
],
) -> "EntityType": # noqa: F821
"""Streaming ingestion. Write feature values directly to Feature Store.
```
my_entity_type = aiplatform.EntityType(
entity_type_name="my_entity_type_id",
featurestore_id="my_featurestore_id",
)
# writing feature values from a pandas DataFrame
my_dataframe = pd.DataFrame(
data = [
{"entity_id": "movie_01", "average_rating": 4.9}
],
columns=["entity_id", "average_rating"],
)
my_dataframe = my_df.set_index("entity_id")
my_entity_type.write_feature_values(
instances=my_df
)
# writing feature values from a Python dict
my_data_dict = {
"movie_02" : {"average_rating": 3.7}
}
my_entity_type.write_feature_values(
instances=my_data_dict
)
# writing feature values from a list of WriteFeatureValuesPayload objects
payloads = [
gca_featurestore_online_service.WriteFeatureValuesPayload(
entity_id="movie_03",
feature_values=gca_featurestore_online_service.FeatureValue(
double_value=4.9
)
)
]
my_entity_type.write_feature_values(
instances=payloads
)
# reading back written feature values
my_entity_type.read(
entity_ids=["movie_01", "movie_02", "movie_03"]
)
```
Args:
instances (
Union[
List[gca_featurestore_online_service.WriteFeatureValuesPayload],
Dict[str, Dict[str, Union[int, str, float, bool, bytes,
List[int], List[str], List[float], List[bool]]]],
pd.Dataframe]):
Required. Feature values to be written to the Feature Store that
can take the form of a list of WriteFeatureValuesPayload objects,
a Python dict of the form {entity_id : {feature_id : feature_value}, ...},
or a pandas Dataframe, where the index column holds the unique entity
ID strings and each remaining column represents a feature. Each row
in the pandas Dataframe represents an entity, which has an entity ID
and its associated feature values. Currently, a single payload can be
written in a single request.
Returns:
EntityType - The updated EntityType object.
"""

if isinstance(instances, Dict):
payloads = self._generate_payloads(instances=instances)
elif isinstance(instances, List):
payloads = instances
else:
instances_dict = instances.to_dict(orient="index")
payloads = self._generate_payloads(instances=instances_dict)

_LOGGER.log_action_start_against_resource(
"Writing",
"feature values",
self,
)

self._featurestore_online_client.write_feature_values(
entity_type=self.resource_name, payloads=payloads
)

_LOGGER.log_action_completed_against_resource("feature values", "written", self)

return self

@classmethod
def _generate_payloads(
cls,
instances: Dict[
str,
Dict[
str,
Union[
int,
str,
float,
bool,
bytes,
List[int],
List[str],
List[float],
List[bool],
],
],
],
) -> List[gca_featurestore_online_service.WriteFeatureValuesPayload]:
"""Helper method used to generate GAPIC WriteFeatureValuesPayloads from
a Python dict.
Args:
instances (Dict[str, Dict[str, Union[int, str, float, bool, bytes,
List[int], List[str], List[float], List[bool]]]]):
Required. Dict mapping entity IDs to their corresponding features.
Returns:
List[gca_featurestore_online_service.WriteFeatureValuesPayload] -
A list of WriteFeatureValuesPayload objects ready to be written to the Feature Store.
"""
payloads = []
for entity_id, features in instances.items():
feature_values = {}
for feature_id, value in features.items():
feature_value = cls._convert_value_to_gapic_feature_value(
feature_id=feature_id, value=value
)
feature_values[feature_id] = feature_value
payload = gca_featurestore_online_service.WriteFeatureValuesPayload(
entity_id=entity_id, feature_values=feature_values
)
payloads.append(payload)

return payloads

@classmethod
def _convert_value_to_gapic_feature_value(
cls,
feature_id: str,
value: Union[
int, str, float, bool, bytes, List[int], List[str], List[float], List[bool]
],
) -> gca_featurestore_online_service.FeatureValue:
"""Helper method that converts a Python literal value or a list of
literals to a GAPIC FeatureValue.
Args:
feature_id (str):
Required. Name of a feature.
value (Union[int, str, float, bool, bytes,
List[int], List[str], List[float], List[bool]]]):
Required. Python literal value or list of Python literals to
be converted to a GAPIC FeatureValue.
Returns:
gca_featurestore_online_service.FeatureValue - GAPIC object
that represents the value of a feature.
Raises:
ValueError if a list has values that are not all of the same type.
ValueError if feature type is not supported.
"""
if isinstance(value, bool):
feature_value = gca_featurestore_online_service.FeatureValue(
bool_value=value
)
elif isinstance(value, str):
feature_value = gca_featurestore_online_service.FeatureValue(
string_value=value
)
elif isinstance(value, int):
feature_value = gca_featurestore_online_service.FeatureValue(
int64_value=value
)
elif isinstance(value, float):
feature_value = gca_featurestore_online_service.FeatureValue(
double_value=value
)
elif isinstance(value, bytes):
feature_value = gca_featurestore_online_service.FeatureValue(
bytes_value=value
)
elif isinstance(value, List):
if all([isinstance(item, bool) for item in value]):
feature_value = gca_featurestore_online_service.FeatureValue(
bool_array_value=gca_types.BoolArray(values=value)
)
elif all([isinstance(item, str) for item in value]):
feature_value = gca_featurestore_online_service.FeatureValue(
string_array_value=gca_types.StringArray(values=value)
)
elif all([isinstance(item, int) for item in value]):
feature_value = gca_featurestore_online_service.FeatureValue(
int64_array_value=gca_types.Int64Array(values=value)
)
elif all([isinstance(item, float) for item in value]):
feature_value = gca_featurestore_online_service.FeatureValue(
double_array_value=gca_types.DoubleArray(values=value)
)
else:
raise ValueError(
f"Cannot infer feature value for feature {feature_id} with "
f"value {value}! Please ensure every value in the list "
f"is the same type (either int, str, float, bool)."
)

else:
raise ValueError(
f"Cannot infer feature value for feature {feature_id} with "
f"value {value}! {type(value)} type is not supported. "
f"Please ensure value type is an int, str, float, bool, "
f"bytes, or a list of int, str, float, bool."
)
return feature_value
3 changes: 3 additions & 0 deletions google/cloud/aiplatform/preview/featurestore/entity_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@
class EntityType(_entity_type._EntityType):
"""Preview EntityType resource for Vertex AI."""

# TODO(b/262275273): Remove preview v1beta1 implementation of `write_feature_values`
# when GA implementation can write multiple payloads per request. Currently, GA
# supports one payload per request.
def write_feature_values(
self,
instances: Union[
Expand Down
2 changes: 1 addition & 1 deletion samples/model-builder/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -594,7 +594,7 @@ def mock_import_feature_values(mock_entity_type):
@pytest.fixture
def mock_write_feature_values(mock_entity_type):
with patch.object(
mock_entity_type.preview, "write_feature_values"
mock_entity_type, "write_feature_values"
) as mock_write_feature_values:
yield mock_write_feature_values

Expand Down
7 changes: 1 addition & 6 deletions samples/model-builder/test_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,12 +218,7 @@
"title": "The Shawshank Redemption",
"average_rating": 4.7,
"genre": "Drama",
},
"movie_02": {
"title": "Everything Everywhere All At Once",
"average_rating": 4.4,
"genre": "Adventure",
},
}
}
FEATURE_ID = "liked_genres"
FEATURE_IDS = ["age", "gender", "liked_genres"]
Expand Down
7 changes: 1 addition & 6 deletions samples/model-builder/write_feature_values_sample.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,9 @@ def write_feature_values_sample(
"average_rating": 4.7,
"genre": "Drama",
},
"movie_02": {
"title": "Everything Everywhere All At Once",
"average_rating": 4.4,
"genre": "Adventure",
},
}

my_entity_type.preview.write_feature_values(instances=my_data)
my_entity_type.write_feature_values(instances=my_data)


# [END aiplatform_write_feature_values_sample]
5 changes: 4 additions & 1 deletion tests/system/aiplatform/test_featurestore.py
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,7 @@ def test_write_features(self, shared_state, caplog):
},
{
"entity_id": "movie_02",
"average_rating": 4.5,
"average_rating": 4.4,
"title": "The Shining",
"genres": ["Horror", "Action"],
},
Expand All @@ -467,6 +467,9 @@ def test_write_features(self, shared_state, caplog):

# Write feature values
movie_entity_type.preview.write_feature_values(instances=movies_df)
movie_entity_type.write_feature_values(
instances={"movie_02": {"average_rating": 4.5}}
)

# Ensure writing feature values overwrites previous values
movie_entity_df_avg_rating_genres = movie_entity_type.read(
Expand Down
Loading

0 comments on commit 6bc4c84

Please sign in to comment.