This directory contains 3 samples for Pub/Sub Lite Spark Connector:
- Word count sample. The sample reads single word count messages from Pub/Sub Lite, does the aggregation (count words) in Spark, and finally writes back to Pub/Sub Lite. Note the topic/subscription to read is different from the topic/subscription to write and verify the final word count results.
- Simple read sample. The sample reads messages from Pub/Sub Lite, and outputs to console sink.
- Simple write sample. The sample creates DataFrame inside spark and writes to Pub/Sub Lite.
Please see the Google cloud authentication guide.
The recommended approach is to use Application Default Credentials by setting GOOGLE_APPLICATION_CREDENTIALS
.
Set the following environment variables.
Note SOURCE_TOPIC_ID
and SOURCE_SUBSCRIPTION_ID
are used to read raw single word count messages;
while DESTINATION_TOPIC_ID
and DESTINATION_SUBSCRIPTION_ID
are used for the final word counts results. They must
be different.
export PROJECT_NUMBER=12345 # or your project number
export REGION=us-central1 # or your region
export ZONE_ID=b # or your zone id
export SOURCE_TOPIC_ID=test-topic # or your topic id to create
export SOURCE_SUBSCRIPTION_ID=test-subscription # or your subscription to create
export DESTINATION_TOPIC_ID=test-topic-2 # or your topic id to create, this is different from SOURCE_TOPIC_ID!
export DESTINATION_SUBSCRIPTION_ID=test-subscription-2 # or your subscription to create, this is different from SOURCE_SUBSCRIPTION_ID!
export CLUSTER_NAME=waprin-spark7 # or your Dataproc cluster name to create
export BUCKET=gs://your-gcs-bucket
export CONNECTOR_VERSION= # latest pubsublite-spark-sql-streaming release version
To run the word count sample in Dataproc cluster, either use provided bash script word_count_sample.sh run
or
follow the steps:
cd samples/snippets
- Set extra environment variables.
export SAMPLE_VERSION=$(mvn -q \ -Dexec.executable=echo \ -Dexec.args='${project.version}' \ --non-recursive \ exec:exec) export SOURCE_SUBSCRIPTION_PATH=projects/$PROJECT_NUMBER/locations/$REGION-$ZONE_ID/subscriptions/$SOURCE_SUBSCRIPTION_ID export DESTINATION_TOPIC_PATH=projects/$PROJECT_NUMBER/locations/$REGION-$ZONE_ID/topics/$DESTINATION_TOPIC_ID
- Create both the source and destination topics and subscriptions, and publish word count messages to the source
topic.
mvn compile exec:java -Dexec.mainClass=pubsublite.spark.PublishWords
- Create a Dataproc cluster
gcloud dataproc clusters create $CLUSTER_NAME --region=$REGION --zone=$REGION-$ZONE_ID --image-version=1.5-debian10 --scopes=cloud-platform
- Package sample jar
mvn clean package -Dmaven.test.skip=true
- Download
pubsublite-spark-sql-streaming-$CONNECTOR_VERSION-with-dependencies.jar
from Maven Central and setPUBSUBLITE_SPARK_SQL_STREAMING_JAR_LOCATION
environment variable. - Create GCS bucket and upload both
pubsublite-spark-sql-streaming-$CONNECTOR_VERSION-with-dependencies.jar
and the sample jar onto GCSgsutil mb $BUCKET gsutil cp target/pubsublite-spark-snippets-$SAMPLE_VERSION.jar $BUCKET gsutil cp $PUBSUBLITE_SPARK_SQL_STREAMING_JAR_LOCATION $BUCKET
- Set Dataproc region
gcloud config set dataproc/region $REGION
- Run the sample in Dataproc. This will perform word count aggregation and publish word count results to Pub/Sub Lite.
gcloud dataproc jobs submit spark --cluster=$CLUSTER_NAME \ --jars=$BUCKET/pubsublite-spark-snippets-$SAMPLE_VERSION.jar,$BUCKET/pubsublite-spark-sql-streaming-$CONNECTOR_VERSION-with-dependencies.jar \ --class=pubsublite.spark.WordCount \ --properties=spark.submit.deployMode=cluster,spark.yarn.appMasterEnv.SOURCE_SUBSCRIPTION_PATH=$SOURCE_SUBSCRIPTION_PATH,spark.yarn.appMasterEnv.DESTINATION_TOPIC_PATH=$DESTINATION_TOPIC_PATH
- Read word count results from Pub/Sub Lite, you should see the result in console output.
mvn compile exec:java -Dexec.mainClass=pubsublite.spark.ReadResults
To clean up, either use provided bash script word_count_sample.sh clean
or follow the steps:
- Delete Pub/Sub Lite topic and subscription.
gcloud pubsub lite-subscriptions delete $SOURCE_SUBSCRIPTION_ID --zone=$REGION-$ZONE_ID gcloud pubsub lite-topics delete $SOURCE_TOPIC_ID --zone=$REGION-$ZONE_ID gcloud pubsub lite-subscriptions delete $DESTINATION_SUBSCRIPTION_ID --zone=$REGION-$ZONE_ID gcloud pubsub lite-topics delete $DESTINATION_TOPIC_ID --zone=$REGION-$ZONE_ID
- Delete GCS bucket.
gsutil -m rm -rf $BUCKET
- Delete Dataproc cluster.
gcloud dataproc clusters delete $CLUSTER_NAME --region=$REGION
Set the following environment variables.
export PROJECT_NUMBER=12345 # or your project number
export REGION=us-central1 # or your region
export ZONE_ID=b # or your zone id
export SOURCE_TOPIC_ID=test-topic # or your topic id to create
export SOURCE_SUBSCRIPTION_ID=test-subscription # or your subscription to create
export CLUSTER_NAME=waprin-spark7 # or your Dataproc cluster name to create
export BUCKET=gs://your-gcs-bucket
export CONNECTOR_VERSION= # latest pubsublite-spark-sql-streaming release version
To run the simple read sample in Dataproc cluster, either use provided bash script simple_read_sample.sh run
or
follow the steps:
cd samples/snippets
- Set extra environment variables.
export SAMPLE_VERSION=$(mvn -q \ -Dexec.executable=echo \ -Dexec.args='${project.version}' \ --non-recursive \ exec:exec) export SOURCE_SUBSCRIPTION_PATH=projects/$PROJECT_NUMBER/locations/$REGION-$ZONE_ID/subscriptions/$SOURCE_SUBSCRIPTION_ID
- Create both the source and destination topics and subscriptions, and publish word count messages to the source
topic.
mvn compile exec:java -Dexec.mainClass=pubsublite.spark.PublishWords
- Create a Dataproc cluster
gcloud dataproc clusters create $CLUSTER_NAME --region=$REGION --zone=$REGION-$ZONE_ID --image-version=1.5-debian10 --scopes=cloud-platform
- Package sample jar
mvn clean package -Dmaven.test.skip=true
- Download
pubsublite-spark-sql-streaming-$CONNECTOR_VERSION-with-dependencies.jar
from Maven Central and setPUBSUBLITE_SPARK_SQL_STREAMING_JAR_LOCATION
environment variable. - Create GCS bucket and upload both
pubsublite-spark-sql-streaming-$CONNECTOR_VERSION-with-dependencies.jar
and the sample jar onto GCSgsutil mb $BUCKET gsutil cp target/pubsublite-spark-snippets-$SAMPLE_VERSION.jar $BUCKET gsutil cp $PUBSUBLITE_SPARK_SQL_STREAMING_JAR_LOCATION $BUCKET
- Set Dataproc region
gcloud config set dataproc/region $REGION
- Run the sample in Dataproc. You would see the messages show up in the console output.
gcloud dataproc jobs submit spark --cluster=$CLUSTER_NAME \ --jars=$BUCKET/pubsublite-spark-snippets-$SAMPLE_VERSION.jar,$BUCKET/pubsublite-spark-sql-streaming-$CONNECTOR_VERSION-with-dependencies.jar \ --class=pubsublite.spark.SimpleRead -- $SOURCE_SUBSCRIPTION_PATH
To clean up, either use provided bash script simple_read_sample.sh clean
or follow the steps:
- Delete Pub/Sub Lite topic and subscription.
gcloud pubsub lite-subscriptions delete $SOURCE_SUBSCRIPTION_ID --zone=$REGION-$ZONE_ID gcloud pubsub lite-topics delete $SOURCE_TOPIC_ID --zone=$REGION-$ZONE_ID
- Delete GCS bucket.
gsutil -m rm -rf $BUCKET
- Delete Dataproc cluster.
gcloud dataproc clusters delete $CLUSTER_NAME --region=$REGION
Set the following environment variables.
export PROJECT_NUMBER=12345 # or your project number
export REGION=us-central1 # or your region
export ZONE_ID=b # or your zone id
export DESTINATION_TOPIC_ID=test-topic # or your topic id to create
export DESTINATION_SUBSCRIPTION_ID=test-subscription # or your subscription to create
export CLUSTER_NAME=waprin-spark7 # or your Dataproc cluster name to create
export BUCKET=gs://your-gcs-bucket
export CONNECTOR_VERSION= # latest pubsublite-spark-sql-streaming release version
To run the simple read sample in Dataproc cluster, either use provided bash script simple_write_sample.sh run
or
follow the steps:
cd samples/snippets
- Set extra environment variables.
export SAMPLE_VERSION=$(mvn -q \ -Dexec.executable=echo \ -Dexec.args='${project.version}' \ --non-recursive \ exec:exec) export DESTINATION_TOPIC_PATH=projects/$PROJECT_NUMBER/locations/$REGION-$ZONE_ID/topics/$DESTINATION_TOPIC_ID
- Create both the source and destination topics and subscriptions, and publish word count messages to the source
topic.
mvn compile exec:java -Dexec.mainClass=pubsublite.spark.PublishWords
- Create a Dataproc cluster
gcloud dataproc clusters create $CLUSTER_NAME --region=$REGION --zone=$REGION-$ZONE_ID --image-version=1.5-debian10 --scopes=cloud-platform
- Package sample jar
mvn clean package -Dmaven.test.skip=true
- Download
pubsublite-spark-sql-streaming-$CONNECTOR_VERSION-with-dependencies.jar
from Maven Central and setPUBSUBLITE_SPARK_SQL_STREAMING_JAR_LOCATION
environment variable. - Create GCS bucket and upload both
pubsublite-spark-sql-streaming-$CONNECTOR_VERSION-with-dependencies.jar
and the sample jar onto GCSgsutil mb $BUCKET gsutil cp target/pubsublite-spark-snippets-$SAMPLE_VERSION.jar $BUCKET gsutil cp $PUBSUBLITE_SPARK_SQL_STREAMING_JAR_LOCATION $BUCKET
- Set Dataproc region
gcloud config set dataproc/region $REGION
- Run the sample in Dataproc. You would see the messages show up in the console output.
gcloud dataproc jobs submit spark --cluster=$CLUSTER_NAME \ --jars=$BUCKET/pubsublite-spark-snippets-$SAMPLE_VERSION.jar,$BUCKET/pubsublite-spark-sql-streaming-$CONNECTOR_VERSION-with-dependencies.jar \ --class=pubsublite.spark.SimpleWrite \ --properties=spark.submit.deployMode=cluster,spark.yarn.appMasterEnv.DESTINATION_TOPIC_PATH=$DESTINATION_TOPIC_PATH
To clean up, either use provided bash script simple_write_sample.sh clean
or follow the steps:
- Delete Pub/Sub Lite topic and subscription.
gcloud pubsub lite-subscriptions delete DESTINATION_SUBSCRIPTION_ID --zone=$REGION-$ZONE_ID gcloud pubsub lite-topics delete $DESTINATION_TOPIC_ID --zone=$REGION-$ZONE_ID
- Delete GCS bucket.
gsutil -m rm -rf $BUCKET
- Delete Dataproc cluster.
gcloud dataproc clusters delete $CLUSTER_NAME --region=$REGION
-
Permission not granted.
This could happen when creating a topic and a subscription, or submitting a job to your Dataproc cluster. Make sure your service account has at leastEditor
permissions for Pub/Sub Lite and Dataproc. Your Dataproc cluster needsscope=cloud-platform
to access other services and resources within the same project. Yourgcloud
andGOOGLE_APPLICATION_CREDENTIALS
should access the same project. Check out which project yourgcloud
andgstuil
commands use withgcloud config get-value project
. -
Your Dataproc job fails with
ClassNotFound
or similar exceptions.
Make sure your Dataproc cluster uses images of supported Spark versions.