This is an end-to-end demo using BigQuery continuous queries to address abandoned ecommerce shopping carts. This was demoed onstage at Google Cloud Next 2024 [recording].
BigQuery continuous queries are SQL statements that run continuously, allowing you analyze incoming data in BigQuery in real time. You can insert the output rows produced by a continuous query into a BigQuery table or export them to Pub/Sub or Bigtable.
Documentation on BigQuery continuous queries can be found HERE, and a blog which provides context of BigQuery continuous queries can be found HERE.
Be sure to request your project(s) be allowlisted for BigQuery continuous queries by submitting this request form.
Imagine this: You've poured your heart into creating a fantastic product, attracted potential customers to your website, and they've even added items to their cart. But then, they vanish without completing the purchase. Frustrating, right? Shopping cart abandonment is a widespread issue; the average cart abandonment rate hovers around a disheartening 70% according to the Baymard Institute. One solution? Real-time engagement that rekindles their interest with a BigQuery continuous query.
To demonstrate this example, let's build a basic quick and dirty demo. We’ll use a BigQuery table named “abandoned_carts” that logs our website’s abandoned cart events and captures: customer’s contact information, the abandoned cart contents, and the abandonment time. We’ll run a BigQuery continuous query that constantly monitors this “abandoned_carts” table for new events, sends any new abandoned carts through Google Cloud Gemini to generate a tailored promotional email for each customer with product suggestions and perhaps a limited-time discount, and publishes the personalized email content to a “recapture_customer” Pub/Sub topic. Lastly we’ll use a simple Application Integration platform trigger to send an email for each Pub/Sub message received.
-
Ensure your project has enabled the Vertex AI API and Pub/Sub API
-
Ensure your user account has the appropriate IAM permissions [ref]. During this demo, we'll run the continuous query with a Service Account as we'll be writing to a Pub/Sub topic.
-
Create a dataset and table in your project by running the following SQL query in your BigQuery environment:
#Creates a dataset named Continuous_Queries_demo. Be sure to replace the project production-242320 with your own Project ID. CREATE SCHEMA `production-242320.Continuous_Queries_Demo`; #Creates a table named abandoned carts. CREATE TABLE `Continuous_Queries_Demo.abandoned_carts`( customer_name string, customer_email string, last_updated timestamp default current_timestamp, products string);
-
Create a BigQuery remote connection named "continuous-queries-connection" in the Cloud Console using these steps.
-
After the connection has been created, click "Go to connection", and in the Connection Info pane, copy the service account ID for use in the next step.
-
Grant Vertex AI User role IAM access to the service account ID you just copied using these steps.
-
Create a BigQuery ML remote model with Gemini 1.5 Pro by running the following SQL query in your BigQuery environment:
#Creates a BigQuery ML remote model named gemini_1_5_pro CREATE MODEL `Continuous_Queries_Demo.gemini_1_5_pro` REMOTE WITH CONNECTION `us.continuous-queries-connection` OPTIONS(endpoint = 'gemini-1.5-pro');
-
Create a BigQuery Service Account named "bq-continuous-query-sa", granting yourself permissions to subit a job that runs using the service account [ref], and granting permissions to the service account itself to access BigQuery resources [ref].
NOTE: if you have issues with this demo, it is 9 times out of 10 related to an IAM permissions issue.
-
Create a Pub/Sub topic [ref] named "recapture_customer", with a default subscription, which you'll write the results of your continuous query to.
-
Grant the service account you created in step #7 permissions to the Pub/Sub topic with the Pub/Sub Viewer and Pub/Sub Publisher roles [ref].
Google Cloud's Application Integration platform offers a comprehensive set of core integration tools to connect and manage the multitude of applications (Google Cloud services and third-party SaaS). We'll use it to create a trigger based on our Pub/Sub topic and send an email based on the contents of the Pub/Sub message.
-
Set up an Application Integration environment by following the Quick Setup Instructions.
-
Once setup, click the CREATE INTEGRATION button and name your integration "abandoned-shopping-carts-integration".
-
Click Triggers at the top of the Application Integration bar, search for "Cloud Pub/Sub", and add your Pub/Sub trigger onto the canvas.
-
Under Trigger Input, add the name of the Pub/Sub Topic and the "bq-continuous-query-sa" service account you previously created.
-
If you see a warning that says "Grant the necessary roles", click GRANT.
-
Click Tasks at the top of the Application Integration bar, search for "Data Mapping", and add the Data Mapping item to your canvas.
-
Connect the Cloud Pub/Sub Trigger to the Data Mapping item.
-
Click the Data Mapping item and click the button that says "OPEN DATA MAPPING EDITOR"
-
You'll crate four Input variables, each initially starting as "CloudPubSubMessage.data" :
-
For the first Input's Output, click "create a new one", referring to create a new variable. Name it "message_output", change the Variable Type to "Output from Integration", change the Data Type to "String", and change the default value to "Empty String".
-
For the second Input, click the Plus icon and select TO_JSON() near the bottom. Click the next Plus and select GET_PROPERTY(), click the "Variable or Value" link, click the Value tab and type in "customer_message". Click Save.
-
For the second Input's Output, click "create a new one". Name this one "customer_message" and set the same configuration settings as the "message_output" above.
-
Two of your four data mappings should now be complete
-
For the third input, click the Plus icon, select TO_JSON, click the next Plus and select GET_PROPERTY(). Click to add a new Value and type in "customer_email". Click Save.
-
For the third Input's Output, click "create a new one". Name this one "customer_email" and set the same configuration settings as the other ones above.
-
For the fourth input, click the Plus icon, select TO_JSON, click the next Plus and select GET_PROPERTY(). Click to add a new Value and type in "customer_name". Click Save.
-
For the fourth Input's Output, click "create a new one". Name this one "customer_name" and set the same configuration settings as the other ones above.
-
All four of your mappings should be complete
-
To the left of "Data Mapping Task Editor" on the top of the screen, click the back arrow to go back to the canvas.
-
Click Tasks at the top of the Application Integration bar, search for "Send Email", and add the Send Email item to your canvas.
-
Connect the Data Mapping item to the Send Email item.
-
Click the Send Email item and on for Task Input enter the following:
-
Click the "PUBLISH" button on the top right of the Application Integration bar
-
Go back to the Pub/Sub page and to your Pub/Sub Topic named "recapture_customer", which we created earlier. You'll see that you have a new subscription named something like "<your_project_name>recapture_customer<some_random_string>"
-
Click on this subscription, then click the Edit pencil button at the top of the screen.
-
Under the Service Account section, you'll likely see a warning which states "Note: Cloud Pub/Sub needs the role roles/iam.serviceAccountTokenCreator granted to service account service-<your_project_number>@gcp-sa-pubsub.iam.gserviceaccount.com on this project to create identity tokens. You can change this later."
-
Click the "GRANT" button to grant these permissions. Then click the "UPDATE" button at the bottom of the Edit wizard.
-
Your Application Integration should now be fully deployed.
-
BigQuery continuous queries require a BigQuery Enterprise or Enterprise Plus reservation [ref]. Create one now named "bq-continuous-queries-reservation" in the US multi-region, with 100 slots, and a 100 slot baseline (at the time of this writing BigQuery continuous queries does not support autoscaling).
-
Once the reservation has been created, click on the three dots under Actions, and click "Create assignment".
-
Click Browse and find the project you are using for this demo. Then Select "CONTINUOUS" as the Job Type. Click Create.
NOTE: If you do not see this option, your project or user may not be allowlisted to use the BigQuery continuous queries public preview. Fill out this request form to obtian access.
-
You'll now see your assignment created under your reservation:
-
Go back to the BigQuery SQL editor and paste the following SQL query:
Note: The URI provided in the example below specifies a Pub/Sub Topic as the destination for the continuous query, with the GCP project listed as "my_project" and the Pub/Sub Topic listed as "my_topic". Be sure to change these to your own project/topic. You can also specify different destinations for a BigQuery continuous query, as described in these examples.
EXPORT DATA OPTIONS (format = CLOUD_PUBSUB, #Be sure to replace the Pub/Sub URI with your own Project ID and Pub/Sub Topic ID. uri = "https://pubsub.googleapis.com/projects/my_project/topics/my_topic") AS (SELECT TO_JSON_STRING( STRUCT( customer_name AS customer_name, customer_email AS customer_email, REGEXP_REPLACE(REGEXP_EXTRACT(ml_generate_text_llm_result,r"(?im)\<html\>(?s:.)*\<\/html\>"), r"(?i)\[your name\]", "Your friends at AI Megastore") AS customer_message)) FROM ML.GENERATE_TEXT( MODEL `Continuous_Queries_Demo.gemini_1_5_pro`, (SELECT customer_name, customer_email, CONCAT("Write an email to customer ", customer_name, ", explaining the benefits and encouraging them to complete their purchase of: ", products, ". Also show other items the customer might be interested in. Provide the response email in HTML format.") AS prompt FROM `Continuous_Queries_Demo.abandoned_carts`), STRUCT( 1024 AS max_output_tokens, 0.2 AS temperature, 1 AS candidate_count, TRUE AS flatten_json_output)))
-
Before you can run your query, you must enable BigQuery continuous query mode. In the BigQuery editor, click More -> Continuous Query mode
-
When the window opens, click the button CONFIRM to enable continuous queries for this BigQuery editor tab.
-
Since we are writing the results of this continuous query to a Pub/Sub topic, you must run this query using a Service Account [ref]. We'll use the service account we created earlier. Click More -> Query Settings and scroll down to the Continuous query section and select your service account "bq-continuous-query-sa" and click Save.
-
Your continuous query should now be valid.
-
Click Run to start your continuous query. After about a minute or so, the continuous query will be fully running, ready to receive and process incoming data into your abandoned_carts table.
-
BigQuery continuous queries can read and process data which arrives into BigQuery in a variety of ways [ref]. For the purposes of this end-to-end demo, we'll offer two options: a very simple DML INSERT and streaming data to the abandoned_carts table using the BigQuery Storage Write API.
-
To insert data into your table via a simple DML INSERT, just run the below query from the BigQuery console to insert one new row into your table:
#Simple DML INSERT to add one "abandoned shopping cart" to your table. #Be sure to change the email address to an email address you can actually access for demo purposes. INSERT INTO `Continuous_Queries_Demo.abandoned_carts`(customer_name, customer_email,products) VALUES ("Your_Shopper's_Name","Your.Shoppers.Email@gmail.com","Violin Strings, Tiny Saxophone, Guitar Strap")
-
To stream data into your table via the BigQuery Storage Write API [ref], copy the files from the write-api-streaming-example folder from this GitHub repo into a Unix-based development environment (the Google Cloud Shell is amazing for simple dev/test like this)
-
In this example, we’ll use Python, so we’ll stream data as protocol buffers. For a quick refresher on working with protocol buffers, here’s a great tutorial. Using Python, we’ll first align our protobuf messages to the table we created using a .proto file in proto2 format. Use the sample_data.proto file from the write-api-streaming-example folder you downloaded to your developer environment, then run the following command within to update your protocol buffer definition:
protoc --python_out=. sample_data.proto
-
Within your developer environment, run this sample streaming_script.py Python script to insert some new example abandoned cart events by reading from the abandoned_carts.json file and writing into the abandoned_carts BigQuery table. This code uses the BigQuery Storage Write API to stream a batch of row data by appending proto2 serialized bytes to the serialzed_rows repeated field like the example below:
row = sample_data_pb2.SampleData() row.customer_name = "Your_Shopper's_Name" row.customer_email = “Your.Shoppers.Email@gmail.com” row.products = "Violin Strings, Tiny Saxophone, Guitar Strap" proto_rows.serialized_rows.append(row.SerializeToString())
-
Within your BigQuery abandoned_carts table, you'll see your newly ingested data:
-
Now go check your email for the personalized message(s)!
-
Once completed, be sure to clean up your environment by cancelling your continuous query, removing your slot assignment, deleting your slot reservation, deleting your Pub/Sub topic, deleting your Application Integration trigger, etc.