Skip to content

Commit

Permalink
deps: update dependency com.google.cloud:google-cloud-pubsublite to v…
Browse files Browse the repository at this point in the history
…1.4.8 (#343)
  • Loading branch information
jiangmichaellll authored Jan 11, 2022
1 parent d13fe08 commit a191de9
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 24 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-pubsublite</artifactId>
<version>1.4.6</version>
<version>1.4.8</version>
</dependency>
<dependency>
<groupId>com.google.api.grpc</groupId>
Expand Down
2 changes: 1 addition & 1 deletion samples/snapshot/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-pubsublite</artifactId>
<version>1.4.6</version>
<version>1.4.8</version>
</dependency>
<dependency>
<groupId>junit</groupId>
Expand Down
2 changes: 1 addition & 1 deletion samples/snippets/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-pubsublite</artifactId>
<version>1.4.6</version>
<version>1.4.8</version>
</dependency>
<dependency>
<groupId>junit</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
package com.google.cloud.pubsublite.spark;

import static com.google.cloud.pubsublite.internal.wire.ServiceClients.addDefaultSettings;
import static com.google.cloud.pubsublite.internal.wire.ServiceClients.getCallContext;

import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.ApiException;
import com.google.auto.value.AutoValue;
import com.google.cloud.pubsublite.AdminClient;
Expand All @@ -31,7 +33,6 @@
import com.google.cloud.pubsublite.internal.wire.CommitterSettings;
import com.google.cloud.pubsublite.internal.wire.PubsubContext;
import com.google.cloud.pubsublite.internal.wire.RoutingMetadata;
import com.google.cloud.pubsublite.internal.wire.ServiceClients;
import com.google.cloud.pubsublite.internal.wire.SubscriberBuilder;
import com.google.cloud.pubsublite.proto.Cursor;
import com.google.cloud.pubsublite.proto.SeekRequest;
Expand Down Expand Up @@ -125,26 +126,26 @@ public abstract static class Builder {
}

MultiPartitionCommitter newMultiPartitionCommitter(long topicPartitionCount) {
CursorServiceClient serviceClient = newCursorServiceClient();
return new MultiPartitionCommitterImpl(
topicPartitionCount,
(partition) ->
CommitterSettings.newBuilder()
.setSubscriptionPath(this.subscriptionPath())
.setPartition(partition)
.setServiceClient(newCursorServiceClient())
.setStreamFactory(
responseStream ->
serviceClient.streamingCommitCursorCallable().splitCall(responseStream))
.build()
.instantiate());
}

@SuppressWarnings("CheckReturnValue")
PartitionSubscriberFactory getSubscriberFactory() {
return (partition, offset, consumer) -> {
PubsubContext context = PubsubContext.of(Constants.FRAMEWORK);
SubscriberServiceSettings.Builder settingsBuilder =
SubscriberServiceSettings.newBuilder()
.setCredentialsProvider(new PslCredentialsProvider(credentialsKey()));
ServiceClients.addDefaultMetadata(
context, RoutingMetadata.of(this.subscriptionPath(), partition), settingsBuilder);
try {
SubscriberServiceClient serviceClient =
SubscriberServiceClient.create(
Expand All @@ -153,8 +154,15 @@ PartitionSubscriberFactory getSubscriberFactory() {
return SubscriberBuilder.newBuilder()
.setSubscriptionPath(this.subscriptionPath())
.setPartition(partition)
.setServiceClient(serviceClient)
.setMessageConsumer(consumer)
.setStreamFactory(
responseStream -> {
ApiCallContext context =
getCallContext(
PubsubContext.of(Constants.FRAMEWORK),
RoutingMetadata.of(subscriptionPath(), partition));
return serviceClient.subscribeCallable().splitCall(responseStream, context);
})
.setInitialLocation(
SeekRequest.newBuilder()
.setCursor(Cursor.newBuilder().setOffset(offset.value()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@
package com.google.cloud.pubsublite.spark;

import static com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical;
import static com.google.cloud.pubsublite.internal.wire.ServiceClients.addDefaultMetadata;
import static com.google.cloud.pubsublite.internal.wire.ServiceClients.addDefaultSettings;
import static com.google.cloud.pubsublite.internal.wire.ServiceClients.getCallContext;

import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.ApiException;
import com.google.auto.value.AutoValue;
import com.google.cloud.pubsublite.AdminClient;
Expand All @@ -30,6 +31,7 @@
import com.google.cloud.pubsublite.cloudpubsub.PublisherSettings;
import com.google.cloud.pubsublite.internal.Publisher;
import com.google.cloud.pubsublite.internal.wire.PartitionCountWatchingPublisherSettings;
import com.google.cloud.pubsublite.internal.wire.PartitionPublisherFactory;
import com.google.cloud.pubsublite.internal.wire.PubsubContext;
import com.google.cloud.pubsublite.internal.wire.RoutingMetadata;
import com.google.cloud.pubsublite.internal.wire.SinglePartitionPublisherBuilder;
Expand Down Expand Up @@ -84,30 +86,46 @@ public PslCredentialsProvider getCredentialProvider() {
return new PslCredentialsProvider(credentialsKey());
}

private PartitionPublisherFactory getPartitionPublisherFactory() {
PublisherServiceClient client = newServiceClient();
return new PartitionPublisherFactory() {
@Override
public Publisher<MessageMetadata> newPublisher(Partition partition) throws ApiException {
SinglePartitionPublisherBuilder.Builder singlePartitionBuilder =
SinglePartitionPublisherBuilder.newBuilder()
.setTopic(topicPath())
.setPartition(partition)
.setBatchingSettings(PublisherSettings.DEFAULT_BATCHING_SETTINGS)
.setStreamFactory(
responseStream -> {
ApiCallContext context =
getCallContext(
PubsubContext.of(Constants.FRAMEWORK),
RoutingMetadata.of(topicPath(), partition));
return client.publishCallable().splitCall(responseStream, context);
});
return singlePartitionBuilder.build();
}

@Override
public void close() {
client.close();
}
};
}

public Publisher<MessageMetadata> createNewPublisher() {
return PartitionCountWatchingPublisherSettings.newBuilder()
.setTopic(topicPath())
.setPublisherFactory(
partition ->
SinglePartitionPublisherBuilder.newBuilder()
.setTopic(topicPath())
.setPartition(partition)
.setServiceClient(newServiceClient(partition))
.setBatchingSettings(PublisherSettings.DEFAULT_BATCHING_SETTINGS)
.build())
.setPublisherFactory(getPartitionPublisherFactory())
.setAdminClient(getAdminClient())
.build()
.instantiate();
}

private PublisherServiceClient newServiceClient(Partition partition) throws ApiException {
private PublisherServiceClient newServiceClient() throws ApiException {
PublisherServiceSettings.Builder settingsBuilder = PublisherServiceSettings.newBuilder();
settingsBuilder = settingsBuilder.setCredentialsProvider(getCredentialProvider());
settingsBuilder =
addDefaultMetadata(
PubsubContext.of(Constants.FRAMEWORK),
RoutingMetadata.of(topicPath(), partition),
settingsBuilder);
try {
return PublisherServiceClient.create(
addDefaultSettings(topicPath().location().extractRegion(), settingsBuilder));
Expand Down

0 comments on commit a191de9

Please sign in to comment.