How to replay time series data from Google BigQuery to Pub/Sub
This is a tutorial article explaining how to replay time series data from a BigQuery table into a Pub/Sub topic. There are several use cases when you might need it:
- Backtesting.
- Demos / Visualizations.
- Integration testing.
The go-to GCP service for moving data between different services is Dataflow. While there are many Google-provided Dataflow templates, there are none for moving data from BigQuery to Pub/Sub.
That’s why we developed our own tool to solve this task: https://github.com/blockchain-etl/bigquery-to-pubsub. It can be used to replay any BigQuery table with a TIMESTAMP field. It’s a Python program that sequentially pulls chunks of data from a partitioned BigQuery table, and publishes the rows as JSON messages to a Pub/Sub topic in a timely manner. Here is an example that highlights the basic options:
> python run_command.py \
--bigquery-table bigquery-public-data.crypto_ethereum.transactions \
--timestamp-field block_timestamp \
--start-timestamp 2019-10-23T00:00:00 \
--end-timestamp 2019-10-23T01:00:00 \
--batch-size-in-seconds 1800 \
--replay-rate 0.1 \
--pubsub-topic projects/${project}/topics/bigquery-to-pubsub-test0 \
Below you’ll find detailed instructions for replaying Ethereum transactions from the Ethereum dataset in BigQuery.
- Create a Service Account with the following roles:
- BigQuery Admin
- Storage Admin
- Pub/Sub Publisher
2. Create a key file for the Service Account and download it as credentials_file.json
.
3. Create a Pub/Sub topic called bigquery-to-pubsub-test0
:
> gcloud pubsub topics create bigquery-to-pubsub-test0
4. Create a temporary GCS bucket and a temporary BigQuery dataset:
> git clone https://github.com/blockchain-etl/bigquery-to-pubsub.git
> cd bigquery-to-pubsub
> bash create_temp_resources.sh
5. Run replay for Ethereum transactions:
> docker build -t bigquery-to-pubsub:latest -f Dockerfile .
> project=$(gcloud config get-value project 2> /dev/null)
> temp_resource_name=$(./get_temp_resource_name.sh)
> echo "Replaying Ethereum transactions"
> docker run \
-v /path_to_credentials_file/:/bigquery-to-pubsub/ --env GOOGLE_APPLICATION_CREDENTIALS=/bigquery-to-pubsub/credentials_file.json \
bigquery-to-pubsub:latest \
--bigquery-table bigquery-public-data.crypto_ethereum.transactions \
--timestamp-field block_timestamp \
--start-timestamp 2019-10-23T00:00:00 \
--end-timestamp 2019-10-23T01:00:00 \
--batch-size-in-seconds 1800 \
--replay-rate 0.1 \
--pubsub-topic projects/${project}/topics/bigquery-to-pubsub-test0 \
--temp-bigquery-dataset ${temp_resource_name} \
--temp-bucket ${temp_resource_name}
Running in Google Kubernetes Engine
For long-running replays it’s convenient to start the process in the cloud. Google Kubernetes Engine is a good choice as it provides Stackdriver logging and health monitoring our of the box.
Below are the instructions for spawning a replay job in GKE using Helm:
- Clone the GitHub repository:
> git clone https://github.com/blockchain-etl/bigquery-to-pubsub.git
> cd bigquery-to-pubsub/helm
2. Create and initialize GKE cluster, Pub/Sub topic and temp bucket and BigQuery dataset:
> bash scripts/setup.sh
3. Copy and edit example values:
> cp values-sample.yaml values-dev.yaml
> vim values-dev.yaml
4. For tempBigqueryDataset
and tempBucket
use value printed by bash scripts/get_temp_resource_name.sh
5. Install the chart:
> helm install bigquery-to-pubsub/ -name bigquery-to-pubsub-0 -values values-dev.yaml
6. Inspect the output of kubectl get pods
. The job is done when the status is "Completed". Use kubectl logs <POD> -f
to see the progress.
7. Cleanup the resources:
> bash scripts/cleanup.sh
Next up is an article showing how to do backtesting using Dataflow and this tool.