Skip to content

Commit

Permalink
Support demand endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
marijababi committed Jun 27, 2024
1 parent 47a4e55 commit e634173
Show file tree
Hide file tree
Showing 7 changed files with 748 additions and 3 deletions.
91 changes: 91 additions & 0 deletions src/volur/api/v1alpha1/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from google.rpc.status_pb2 import Status
from loguru import logger
from volur.api.v1alpha1.settings import VolurApiSettings
from volur.pork.demand.v1alpha2 import demand_pb2, demand_pb2_grpc
from volur.pork.materials.v1alpha3 import material_pb2, material_pb2_grpc
from volur.pork.products.v1alpha3 import product_pb2, product_pb2_grpc

Expand Down Expand Up @@ -202,3 +203,93 @@ async def generate_requests() -> (
code=code,
message=message,
)

async def upload_demand_information(
self: "VolurApiAsyncClient",
demand: AsyncIterator[demand_pb2.Demand],
) -> Status:
"""Uploads Demand Information to the Völur platform using the Völur
API.
This method is using a source to get the demand data and then send
it to the Völur API. This method is asynchronous and will return a
status of the operation.
Args:
demand: a source of demand data to be uploaded to the Völur
platform.
Returns:
The status of the operation.
"""

async def generate_requests() -> (
AsyncIterator[demand_pb2.UploadDemandInformationRequest]
):
try:
async for dem in demand:
yield demand_pb2.UploadDemandInformationRequest(
demand=dem,
)
except Exception:
logger.exception(
"error occurred while generating requests",
)

try:
logger.info("start uploading demand data")
channel = grpc.aio.secure_channel(
self.settings.address,
grpc.ssl_channel_credentials(),
)
stub = demand_pb2_grpc.DemandInformationServiceStub(channel)
requests = generate_requests()
stream = stub.UploadDemandInformation(
requests, # type: ignore[arg-type]
metadata=(
(
"authorization",
f"Bearer {self.settings.token.get_secret_value()}",
),
),
)
while True:
response = await stream.read() # type: ignore[attr-defined]
if response == grpc.aio.EOF: # type: ignore[attr-defined]
logger.info("successfully uploaded demand information")
break
if response.HasField("status"):
if response.status.code != 0:
logger.error(
f"error occurred while uploading demand information "
f"{response.status.code} {response.status.message}",
)
else:
logger.debug(
"successfully uploaded demand information",
)
else:
raise ValueError("response from a server does not contain status")
return Status(code=0)
except grpc.aio.AioRpcError as rpc_error:
if rpc_error.code() == grpc.StatusCode.UNAUTHENTICATED:
logger.error(
"used token in invalid,"
"please set a valid token using"
"`VOLUR_API_TOKEN` environment variable",
)
else:
with logger.contextualize(
rpc_error_code=rpc_error.code(),
rpc_error_details=rpc_error.details(),
):
logger.exception(
"error occurred while uploading demand information",
)
code: int
code, _ = rpc_error.code().value # type: ignore[misc]
message = _ if (_ := rpc_error.details()) else ""
return Status(
code=code,
message=message,
)
18 changes: 17 additions & 1 deletion src/volur/sdk/v1alpha2/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from loguru import logger
from volur.api.v1alpha1.client import VolurApiAsyncClient
from volur.sdk.v1alpha2.sources import MaterialsSource, ProductsSource
from volur.sdk.v1alpha2.sources import DemandSource, MaterialsSource, ProductsSource


@dataclass
Expand Down Expand Up @@ -60,3 +60,19 @@ def upload_products_information(
response_status_message=result.message,
)
logger.info("successfully uploaded products information")

def upload_demand_information(
self: "VolurClient",
demand: DemandSource,
) -> None:
result = asyncio.run(
self.api.upload_demand_information(demand),
debug=self.api.settings.debug,
)
if result.code != 0:
logger.error(
"error occurred while uploading demand information",
response_status_code=result.code,
response_status_message=result.message,
)
logger.info("successfully uploaded demand information")
3 changes: 2 additions & 1 deletion src/volur/sdk/v1alpha2/sources/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from .csv import MaterialsSource, ProductsSource
from .csv import DemandSource, MaterialsSource, ProductsSource

__all__ = [
"MaterialsSource",
"ProductsSource",
"DemandSource",
]
5 changes: 4 additions & 1 deletion src/volur/sdk/v1alpha2/sources/csv/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from volur.sdk.v1alpha2.sources.csv.base import ProductsSource
from volur.sdk.v1alpha2.sources.csv.base import DemandSource, ProductsSource

from .base import (
CharacteristicColumn,
Expand All @@ -12,13 +12,16 @@
QuantityColumn,
)
from .source import (
DemandCSVFileSource,
MaterialsCSVFileSource,
ProductsCSVFileSource,
)

__all__ = [
"ProductsSource",
"ProductsCSVFileSource",
"DemandSource",
"DemandCSVFileSource",
"CharacteristicColumn",
"CharacteristicColumnBool",
"CharacteristicColumnFloat",
Expand Down
29 changes: 29 additions & 0 deletions src/volur/sdk/v1alpha2/sources/csv/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from typing import Any, AsyncIterator, Literal

from google.type.date_pb2 import Date
from volur.pork.demand.v1alpha2 import demand_pb2
from volur.pork.materials.v1alpha3 import material_pb2
from volur.pork.products.v1alpha3 import product_pb2
from volur.pork.shared.v1alpha1 import characteristic_pb2, quantity_pb2
Expand Down Expand Up @@ -69,6 +70,34 @@ async def __anext__(
...


@dataclass
class DemandSource:
"""
Base class for the demand sources.
This class in an abstract class that defines the interface for the demand CSV
source.
"""

@abc.abstractmethod
def __aiter__(self: "DemandSource") -> AsyncIterator[demand_pb2.Demand]:
"""DemandSource implements Asynchronous Iterator.
This allows you to use any implementation of DemandSource as
```python title="example.py" linenums="1"
source = DemandSourceImplementation()
for _ in source:
# do something with Demand
```
"""
...

@abc.abstractmethod
async def __anext__(
self: "DemandSource",
) -> demand_pb2.Demand:
"""You can fetch the next element in the asynchronous iterator."""
...


@dataclass
class Column:
column_name: InitVar[str | int]
Expand Down
Loading

0 comments on commit e634173

Please sign in to comment.