Recently I’ve been working with Google Cloud Platform PubSub for various Java applications and Apache Beam pipelines. PubSub offers effortless and cloud-native messaging topic for your software application or BigData processing requirements.
One of the problems I faced was to run PubSub emulator locally part of my Integration test suite. After hunting for ways to run PubSub emulator locally part integration test I’m happy to present my following code snippet whoever out there who looking for the same solution.
package com.kasundon.pubsub; | |
import com.google.api.gax.core.CredentialsProvider; | |
import com.google.api.gax.core.NoCredentialsProvider; | |
import com.google.api.gax.grpc.GrpcTransportChannel; | |
import com.google.api.gax.rpc.AlreadyExistsException; | |
import com.google.api.gax.rpc.FixedTransportChannelProvider; | |
import com.google.api.gax.rpc.TransportChannelProvider; | |
import com.google.cloud.pubsub.v1.MessageReceiver; | |
import com.google.cloud.pubsub.v1.Publisher; | |
import com.google.cloud.pubsub.v1.Subscriber; | |
import com.google.cloud.pubsub.v1.SubscriptionAdminClient; | |
import com.google.cloud.pubsub.v1.SubscriptionAdminSettings; | |
import com.google.cloud.pubsub.v1.TopicAdminClient; | |
import com.google.cloud.pubsub.v1.TopicAdminSettings; | |
import com.google.common.util.concurrent.MoreExecutors; | |
import com.google.protobuf.ByteString; | |
import com.google.pubsub.v1.ProjectSubscriptionName; | |
import com.google.pubsub.v1.ProjectTopicName; | |
import com.google.pubsub.v1.PubsubMessage; | |
import com.google.pubsub.v1.PushConfig; | |
import com.google.pubsub.v1.Subscription; | |
import com.google.pubsub.v1.Topic; | |
import io.grpc.ManagedChannel; | |
import io.grpc.ManagedChannelBuilder; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import org.testcontainers.containers.GenericContainer; | |
import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy; | |
import java.io.IOException; | |
import java.util.concurrent.ExecutorService; | |
import java.util.concurrent.Executors; | |
import java.util.concurrent.TimeUnit; | |
import java.util.function.Consumer; | |
public class EmbededPubSub { | |
private final static Logger LOGGER = LoggerFactory.getLogger(EmbededPubSub.class); | |
private final static int PUBSUB_PORT = 5189; | |
private final static String PROJECT_ID = "my-project-id"; | |
private final static String TOPIC_ID = "my-test-topic"; | |
private final static String SUBSCRIPTION_ID = "my-subscription-id"; | |
public static final GenericContainer PUBSUB_CONTAINER = | |
new GenericContainer("google/cloud-sdk:latest") | |
.withExposedPorts(PUBSUB_PORT) | |
.withCommand( | |
"/bin/sh", | |
"-c", | |
String.format( | |
"gcloud beta emulators pubsub start --project %s --host-port=0.0.0.0:%d", | |
PROJECT_ID, | |
PUBSUB_PORT | |
) | |
) | |
.waitingFor(new LogMessageWaitStrategy().withRegEx("(?s).*started.*$")); | |
private String serviceHostname; | |
private ManagedChannel channel; | |
private TransportChannelProvider channelProvider; | |
private CredentialsProvider credentialsProvider = NoCredentialsProvider.create(); | |
public void start() { | |
PUBSUB_CONTAINER.start(); | |
serviceHostname = String.format("127.0.0.1:%d", PUBSUB_CONTAINER.getMappedPort(PUBSUB_PORT)); | |
channel = ManagedChannelBuilder | |
.forTarget(serviceHostname) | |
.usePlaintext(true) | |
.build(); | |
channelProvider = FixedTransportChannelProvider.create(GrpcTransportChannel.create(channel)); | |
createTopicAndSubscription(TOPIC_ID); | |
LOGGER.debug(" @@ PubSub service successfully started!"); | |
} | |
public void stop() { | |
try { | |
channel.awaitTermination(2000, TimeUnit.MILLISECONDS); | |
} catch (InterruptedException e) { | |
//noop | |
} | |
PUBSUB_CONTAINER.stop(); | |
} | |
public void createTopicAndSubscription( | |
String topicName | |
) { | |
ProjectTopicName projectTopicName = ProjectTopicName.of(PROJECT_ID, topicName); | |
ProjectSubscriptionName projectSubscriptionName = ProjectSubscriptionName.of(PROJECT_ID, SUBSCRIPTION_ID); | |
SubscriptionAdminSettings subscriptionAdminSettings; | |
try { | |
TopicAdminClient topicAdminClient = TopicAdminClient.create( | |
TopicAdminSettings.newBuilder() | |
.setTransportChannelProvider(channelProvider) | |
.setCredentialsProvider(credentialsProvider) | |
.build()); | |
Topic topic = topicAdminClient.createTopic(projectTopicName); | |
LOGGER.debug(" @@ Topic created : {} ", topic.getName()); | |
subscriptionAdminSettings = SubscriptionAdminSettings | |
.newBuilder() | |
.setTransportChannelProvider(channelProvider) | |
.setCredentialsProvider(credentialsProvider) | |
.build(); | |
SubscriptionAdminClient subscriptionAdminClient = | |
SubscriptionAdminClient | |
.create(subscriptionAdminSettings); | |
Subscription subscription = | |
subscriptionAdminClient | |
.createSubscription(projectSubscriptionName, projectTopicName, PushConfig.getDefaultInstance(), 0); | |
LOGGER.debug(" @@ Subscription created : {} ", subscription.getName()); | |
} catch (IOException | AlreadyExistsException e) { | |
throw new RuntimeException(e); | |
} | |
} | |
public void publish( | |
String message | |
) { | |
try { | |
ProjectTopicName topic = ProjectTopicName.of(PROJECT_ID, TOPIC_ID); | |
Publisher publisher = | |
Publisher.newBuilder(topic) | |
.setChannelProvider(channelProvider) | |
.setCredentialsProvider(credentialsProvider) | |
.build(); | |
ByteString data = ByteString.copyFromUtf8(message); | |
PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build(); | |
publisher.publish(pubsubMessage); | |
publisher.shutdown(); | |
} catch (Exception e) { | |
throw new RuntimeException(e); | |
} | |
} | |
public void subscriber( | |
Consumer<String> consumer, | |
long timeoutMs | |
) { | |
ExecutorService executorService = Executors.newSingleThreadExecutor(); | |
executorService.execute(() -> { | |
ProjectSubscriptionName subscription = ProjectSubscriptionName.of(PROJECT_ID, SUBSCRIPTION_ID); | |
MessageReceiver receiver = (message, ackReplyConsumer) -> { | |
consumer.accept(message.getData().toStringUtf8()); | |
//ackReplyConsumer.ack(); | |
}; | |
Subscriber subscriber = null; | |
try { | |
subscriber = | |
Subscriber | |
.newBuilder(subscription, receiver) | |
.setChannelProvider(channelProvider) | |
.setCredentialsProvider(credentialsProvider) | |
.build(); | |
subscriber.addListener(new Subscriber.Listener() { | |
@Override | |
public void failed(Subscriber.State from, Throwable failure) { | |
LOGGER.error(" @@ failure detected : ", failure); | |
} | |
}, MoreExecutors.directExecutor()); | |
subscriber.startAsync().awaitRunning(); | |
Thread.sleep(timeoutMs); | |
} catch (InterruptedException e) { | |
throw new RuntimeException(e); | |
} finally { | |
if (subscriber != null) { | |
subscriber.stopAsync().awaitTerminated(); | |
} | |
} | |
}); | |
} | |
public String getServiceHostname() { | |
if (serviceHostname == null) { | |
throw new IllegalStateException("Embedded PubSub Service not started yet."); | |
} | |
return String.format("http://%s", serviceHostname); | |
} | |
public String getFullyQualifiedTopicName() { | |
return String.format("projects/%s/topics/%s", PROJECT_ID, TOPIC_ID); | |
} | |
} |
Dependencies
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-pubsub</artifactId>
<version>1.35.0</version>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<version>1.7.0</version>
</dependency>
Usage
EmbededPubSub embededPubSub = new EmbededPubSub();
embededPubSub.start();
embededPubSub.subscriber(message -> System.out.println(" @@ Received : " + message), 10000);
embededPubSub.publish("Hello Pub/Sub!");
embededPubSub.stop();
I’ve been utilising testcontainers to spin up a container which runs gcloud cli components and then exposes few ports for host computer to communicate through.
Next thing is if you ever needs to pull/browse messages/subscriptions or other information from Local Pub/Sub emulator for debugging purposes unfortunately we’re unable to use `gcloud` cli tools for this at the time of writing this blog post. workarounds are either write a client code or use Pub/Sub Rest API.
Pub/Sub Rest API
https://cloud.google.com/pubsub/docs/reference/rest/
I hope this article will be useful for somebody who couldn’t find their head around with Pub/Sub Integration test.