Back to Blog

PubSub to BigQuery – How You Can Stream Messages and Avro Data 

There are many use cases for Google Pub/Sub including real-time event distribution, collecting server or user interaction events from apps or systems, sending email notifications, and many more. You can not only gather events from many clients using Pub/Sub, but also export those to different databases. In this article, we’ll show you how to connect PubSub to BigQuery using Dataflow, so buckle up. 🙂

What data you can export from Google PubSub to BigQuery

Basically, you can export messages from a PubSub topic, as well as messages and Avro data from a PubSub subscription into a BigQuery table.

This can be done using the open-source Dataflow templates, which are streaming pipelines to read JSON-formatted messages from a Pub/Sub subscription and write them to a BigQuery table. You can run templates using the REST API, your command-line tool, or just the Google Cloud Console. We’ll use the latter, no-code option, to demonstrate how it works. 

What you need to export GCP PubSub to BigQuery

First of all, you need to have a project on Google Cloud Platform. And here are the ingredients required to gather data from PubSub to BigQuery:

  • BigQuery dataset and table
  • Cloud Storage bucket
  • PubSub topic and subscription
  • Service account (not required if you use Google Cloud Console)

Let’s go step by step, so that both the newbies and oldies can follow. 

How to write data from PubSub into BigQuery

BigQuery dataset and table

Read our BigQuery tutorial if you need assistance with setting up a BigQuery project, dataset, and table. 

The schema of your BigQuery table must correspond to the input JSON objects of your PubSub message. For example, for a message formatted as {"name":"Angela,", "message":"welcome back"}, a BigQuery table must contain two fields: name and message, with string data type.

Note: For the PubSub messages that mismatch the schema of a BigQuery table, the pipeline will create a separate “Deadletter” table with a default name {outputTableName}_error_records. You can change the default name when setting up a Dataflow job. Here is an example of this Deadletter table:

Once you’ve created a BigQuery table, go to the Table details and copy its ID. For example: 

test-project-310805:Test.pubsub

We’ll need this string later when setting up a PubSub to BigQuery connection. 

Cloud Storage bucket

Go to the Google Cloud Storage and click Create Bucket.

  • To create a bucket you need to:
    • Give it a name 
    • Choose location to store your data
    • Choose a default storage class for your data
    • Choose access control
    • Choose protection tools

Click Create.

  • Once a bucket is created, go to the Configuration tab and copy its gsutil URI. For example: 
gs://pubsub-bucket-01

We’ll need this string later when setting up a PubSub to BigQuery connection.  

PubSub topic and subscription

  • In the dialog window, enter the topic ID and click Create Topic. Optionally you can set other parameters, such as “Use a schema“.
  • PubSub will create a new topic and a new subscription. Scroll down to see it. At the same time, you can create a new subscription yourself by clicking Create Subscription.

Now you can export data from PubSub to BigQuery.

Connect PubSub to BigQuery

You’ll need to create a Dataflow job to export data to a BigQuery table. For this, enable the Dataflow API first. 

  • Go to the APIs & Services dashboard.
  • Click Enable APIs and Services.
  • Find the Dataflow API using the search bar and click Enable.
  • Once the Dataflow API is enabled, go back to your PubSub topic and click Export to BigQuery. 

Specify the following parameters to create a Dataflow job:

  • Job name – name your job
ps-to-bq-Test
  • Regional endpoint – choose a regional endpoint to deploy worker instances
us-central1
  • Dataflow template – leave the current template or choose other options such as PubSub Subscription to BigQuery or PubSub Avro to BigQuery. 
PubSub Topic to BigQuery

Note: As per the documentation the template is meant to output only a single table row per message. 

  • Input PubSub topic – leave the suggested topic to read from or specify another topic.
projects/test-project-310714/topics/Test
  • BigQuery output table – paste the route to your BigQuery table that we copied above
test-project-310714:test.pubsub
  • Temporary location –  paste the path to your Cloud Storage bucket that we copied above and filename prefix for writing temporary files
gs://pubsub-bucket-01/temp

Optionally you can:

  • Specify the Cloud Storage path pattern for the JavaScript code containing your user-defined functions.
  • Specify the name of the function to call from your JavaScript file. 
  • Change the name of the Deadletter table that will be created for messages that failed to reach the output table.
  • Specify the maximum number of Google Compute Engine instances to your pipeline.
  • Specify the initial number of Google Compute Engine instances to use.
  • Change worker region and zone.
  • Specify the email address of the service account.
  • Specify the machine type for Google Compute Engine instances.
  • Specify additional experiment flags for the Dataflow job.
  • Change the Worker IP address configuration.
  • Specify the network to which workers will be assigned.
  • Specify the subnetwork to which workers will be assigned.
  • Enable Streaming Engine.

Leave the Google-managed encryption key and click Run Job.

Congrats! You’ve successfully set up a streaming pipeline from PubSub to BigQuery. Let’s test it!

Export PubSub message to BigQuery – test run

Go to your PubSub topic, scroll down and select the Messages tab. Click the Publish Message button to proceed.

Insert your JSON-formatted message in the Message body field and click Publish.

This will run the pipeline – wait a few minutes to set up. After that, you’ll see your message in the specified BigQuery table. Yippee!

Why export data to BigQuery?

BigQuery is a data warehouse that you can use to accumulate records from multiple sources without any threat to your performance. You can even use BigQuery as a backup destination with the help of Coupler.io. 

This is a solution for getting your data from multiple sources, such as HubSpot or QuickBooks, on a custom schedule. Check out all the available BigQuery integrations.

All you need to do is sign up to Coupler.io for free and complete three steps:

  • Set up your data source
  • Set up BigQuery as data destination
  • Set up your schedule

For example:

For now, Coupler.io supports 15+ sources and many more are expected in the future.

Can I export messages from BigQuery to a spreadsheet?

With Coupler.io, you can use BigQuery not only as a data destination but also as a data source. So, feel free to export your PubSub messages from your BigQuery table to Google Sheets and Excel automatically, on a schedule, for example, every Tuesday. The flow is basically the same:

  • Set up BigQuery as data source
  • Set up data destination 
  • Set up your schedule

Check out our guide on BigQuery Data Export to learn more.

PubSub to BigQuery: how much is the fish?

Streaming messages from PubSub to BigQuery using Dataflow is not free. The rate for pricing is based on an hourly basis. The charges are separated on the following resources, billed on a per second basis:

  • vCPU
  • Memory 
  • Storage 
  • GPU (optional)

So each job you have running will generate the three resources such as an instance would do. Check out the Dataflow pricing page for more information.

And here is an example of costs that we incurred while testing different Dataflow jobs when writing this article.

PubSub to BigQuery without Dataflow – is it possible?

Dataflow with its templates is the frequent option for streaming data from PubSub to BigQuery. However, you can always implement your own worker using the SDK in Python or your preferred programming language.

Another option to consider is the Google Cloud Function – it works pretty well for the purposes of just moving data around. You can write a function using Node.js, Python, or another language. Here is an example of a cloud function that transfers data from PubSub to BigQuery. 

from google.cloud import bigquery
import base64, json, sys, os
 
def pubsub_to_bigq(event, context):
   pubsub_message = base64.b64decode(event['data']).decode('utf-8')
   print(pubsub_message)
   to_bigquery(os.environ['dataset'], os.environ['table'], json.loads(pubsub_message))
 
def to_bigquery(dataset, table, document):
   bigquery_client = bigquery.Client()
   dataset_ref = bigquery_client.dataset(dataset)
   table_ref = dataset_ref.table(table)
   table = bigquery_client.get_table(table_ref)
   errors = bigquery_client.insert_rows(table, [document])
   if errors != [] :
      print(errors, file=sys.stderr)

You can create and deploy it using the Cloud Functions console

Define the environment variables (dataset and table names) and there you go. Good luck with your data!

Back to Blog

Comments are closed.

Access your data
in a simple format for free!

Start Free