Real-time Crypto Price Anomaly Detection with Deep Learning and Band Protocol

Evgeny Medvedev
Google Cloud - Community
6 min readApr 15, 2021

--

Immediate and accurate analysis of financial time series data is crucial to the price discovery mechanism that is at the heart of capital markets.

We’ll show you how insights can be derived from financial time series data, in real-time, using Machine Learning. In particular, a Keras model implementing an LSTM neural network for anomaly detection is provided.

The data we’re using comes from the Band Protocol public dataset available in Google BigQuery. Band Protocol is a cross-chain data oracle platform that aggregates and connects real-world data and APIs to smart contracts. This sample query gives you the latest and historical prices in USD of 100 currency pairs, including $ETH, $BTC and $BAND.

Analysis at a glance

The whole process can be broken down into two parts:

Part 1: Metrics calculation using Dataflow. The following metrics and technical indicators are included: Relative Strength Index (RSI), Moving Average (MA), and Open, High, Low, Close (OHLC).

Part 2: Anomaly detection. We are using an LSTM model implemented in Keras.

Now, let’s go through the details.

Part 1: Metrics Calculation

Start with checking out the source code repository:

git clone https://github.com/blockchain-etl/band-dataflow-sample-applications.git
cd band-dataflow-sample-applications
cd timeseries-streaming/timeseries-java-applications

Bootstrap the metrics from historical data

The BandDataBootstrapGenerator pipeline allows you to calculate metrics for historical oracle requests. The data are sourced from the BigQuery table public-data-finance.crypto_band.oracle_requests.

Initialize variables:

PROJECT=$(gcloud config get-value project 2> /dev/null)    
# Make sure to update the date to yesterday's date
TIMESTAMP_THRESHOLD="2020-10-01T07:00:00.0Z"
DATASET_NAME=crypto_band
TABLE_NAME=metrics
TEMP_BUCKET=<replace_with_your_temporary_bucket>

Create a BigQuery dataset:

bq --location=US mk --dataset $DATASET_NAME 

Run the pipeline:

./gradlew generate_band_bootstrap_data --args="\
--bigQueryTableForTSAccumOutputLocation=$PROJECT:$DATASET_NAME.$TABLE_NAME \
--timestampThreshold="$TIMESTAMP_THRESHOLD" \
--runner=DataflowRunner \
--tempLocation=gs://$TEMP_BUCKET/temp \
--maxNumWorkers=1 \
--region=us-central1 \
"

The pipeline will be deployed to Dataflow and will take between 30 to 60 minutes depending on the amount of historical data.

The “bootstrap metrics” Dataflow pipeline

The pipeline uses 10-minute aggregation windows for OHLC and 60-minute rolling windows for moving average and RSI calculation by default. You can customize the parameters with --typeOneComputationsLengthInSecs and --typeTwoComputationsLengthInSecs, respectively.

You can observe the results in the public table band-etl.crypto_band.metrics that we set up earlier:

SELECT * 
FROM `band-etl.crypto_band.metrics`
--WHERE timeseries_key = 'ETH'
ORDER BY lower_window_boundary DESC
LIMIT 1000
band-etl.crypto_band.metrics table in BigQuery console

You can query RSI for $ETH for the previous day:

SELECT upper_window_boundary as time, data.dbl_data as RSI 
FROM `band-etl.crypto_band.metrics`
CROSS JOIN UNNEST(data) AS data
WHERE timeseries_key = 'ETH'
AND data.metric = 'RELATIVE_STRENGTH_INDICATOR'
ORDER BY lower_window_boundary DESC
LIMIT 144

And visualize it in Data Studio right from the BigQuery console:

Ethereum Relative Strength Index (RSI)

Calculate metrics from a Pub/Sub stream

The BandDataStreamGenerator pipeline allows you to calculate metrics for oracle requests pulled from a Pub/Sub subscription projects/public-data-finance/topics/crypto_band.oracle_requests. It will output
the result to a specified BigQuery table and a specified Pub/Sub topic.

Create a Pub/Sub subscription for Band Protocol oracle requests:

gcloud pubsub subscriptions create crypto_band.oracle_requests.metrics \
--topic=crypto_band.oracle_requests \
--topic-project=public-data-finance

Initialise the variables:

PROJECT=$(gcloud config get-value project 2> /dev/null)    
# Make sure to update the date to yesterday's date
TIMESTAMP_THRESHOLD="2020-10-01T07:00:00.0Z"
DATASET_NAME=crypto_band
TABLE_NAME=metrics
TEMP_BUCKET=<replace_with_your_temporary_bucket>
TOPIC_NAME=crypto_band.metrics

Start the pipeline in Dataflow:

./gradlew run_band_example --args="\
--pubSubSubscriptionForOracleRequests=projects/$PROJECT/subscriptions/crypto_band.oracle_requests.metrics \
--bigQueryTableForTSAccumOutputLocation=$PROJECT:$DATASET_NAME.$TABLE_NAME \
--timestampThreshold="$TIMESTAMP_THRESHOLD" \
--pubSubTopicForTSAccumOutputLocation=projects/$PROJECT/topics/$TOPIC_NAME \
--runner=DataflowRunner \
--maxNumWorkers=1 \
--workerMachineType=n1-standard-1 \
--diskSizeGb=30 \
--region=us-central1 \
"

The output will be streamed to the specified BigQuery table and the Pub/Sub topic.

Part 2: Anomaly Detection

The examples provided below are intended to explore the data engineering needed to work with Band Protocol data and deliver it to an auto encoder — decoder.

Part 2 can be broken down into three steps:

  1. Generating TF.Example’s for training the model.
  2. Training an LSTM model on training data.
  3. Batch anomaly detection.

Generating TF.Example’s with Band Protocol data

In order to build the model you will need to first run the generator job
BandDataBootstrapGenerator.java which will create training data as TF.Examples:

git clone https://github.com/blockchain-etl/band-dataflow-sample-applications.git
cd dataflow-sample-applications/timeseries-streaming/timeseries-java-applications

Run the Dataflow job (make sure to replace <your_temp_bucket> with your value and update the TIMESTAMP_THRESHOLD variable):

TIMESTAMP_THRESHOLD="2020-10-03T00:00:00.0Z"
BUCKET=<your_temp_bucket>
./gradlew generate_band_bootstrap_data --args="\
--interchangeLocation=gs://$BUCKET/band_bootstrap_tfexamples/run0 \
--timestampThreshold="$TIMESTAMP_THRESHOLD" \
--runner=DataflowRunner \
--tempLocation=gs://$BUCKET/temp \
--maxNumWorkers=1 \
--region=us-central1 \
"

Once the job is done, download the generated files from gs://$BUCKET/band_bootstrap_tfexamples/run0,
change the information in the config.py to match your local env.

Train the model

Setup virtual environment:

virtualenv -p python3.7 streaming-tf-consumer
source streaming-tf-consumer/bin/activate

Install the dependencies:

git clone https://github.com/blockchain-etl/band-dataflow-sample-applications.git
cd dataflow-sample-applications/timeseries-streaming/timeseries-python-applications
cd MLPipeline
pip install -e .
cd ..
ls

Run the command with the virtual-env activated:

python MLPipelineExamples/test_pipelines/timeseries_local_simple_data.py

You should see the model building as below:

....
Epoch 26/30
280/280 [==============================] - 6s 21ms/step - loss: 119.8072 - mean_absolute_error: 6.8178 - val_loss: 684588.3750 - val_mean_absolute_error: 670.2068
Epoch 27/30
280/280 [==============================] - 7s 23ms/step - loss: 119.6002 - mean_absolute_error: 6.8087 - val_loss: 203.5257 - val_mean_absolute_error: 8.6160
Epoch 28/30
280/280 [==============================] - 6s 20ms/step - loss: 119.5842 - mean_absolute_error: 6.8084 - val_loss: 41512.6406 - val_mean_absolute_error: 160.1564
Epoch 29/30
280/280 [==============================] - 6s 20ms/step - loss: 119.5832 - mean_absolute_error: 6.8084 - val_loss: 5213.2568 - val_mean_absolute_error: 58.4286
Epoch 30/30
280/280 [==============================] - 5s 19ms/step - loss: 119.5791 - mean_absolute_error: 6.8083 - val_loss: 24351.4551 - val_mean_absolute_error: 151.2784

This will output a serving_model_dir under the location you specified for PIPELINE_ROOT in the config.py file.
With this you can now follow the rest of the steps outlines in Option 1 but using your own model.

Batch inference

The pipeline defined in batch_inference.py reads the given TF.Example’s and the saved model,
runs inference to get a predicted value, compares this value with the actual value and if the difference is greater
than the threshold reports the value as anomalous.

Run the command with the virtual-env activated, providing values for the location of
the --saved_model_location using the model built on the previous step,
and the location of the generated data you downloaded from GCS bucket with --tfrecord_folder:

python MLPipelineExamples/test_pipelines/batch_inference.py \
--saved_model_location=./build/Trainer/model/5/serving_model_dir \
--tfrecord_folder=/<your-directory>/data/*

You will see messages of detected outliers:

...
Outlier detected for ETH-value-LAST at 2020-08-23 21:20:00 - 2020-08-23 22:20:00 Difference was 279.74764251708984 for value input 388.8900146484375 output 109.14237213134766 with raw data [388.8900146484375]
Outlier detected for ETH-value-FIRST at 2020-08-22 14:20:00 - 2020-08-22 15:20:00 Difference was 310.73509216308594 for value input 392.1044921875 output 81.36940002441406 with raw data [392.1044921875]
Outlier detected for ETH-value-LAST at 2020-08-22 14:20:00 - 2020-08-22 15:20:00 Difference was 308.1451416015625 for value input 392.1044921875 output 83.9593505859375 with raw data [392.1044921875]

Note 1: the intent of this sample is to demonstrate the data engineering effort needed to support data generated from a streaming Beam pipeline and delivered to an LSTM autoencoder-decoder. It is not intended to demonstrate state of the art machine learning approaches to anomaly detection and more work needs to be done to optimize the model for better performance.

Note 2: the LSTM used here is pre-trained and deployed, and not learning online. Extending this sample for online learning in micro-batches is left as an exercise for the reader.

References

This article is based on the work of Reza Rokni. For more examples of time series data processing in Dataflow refer to this repository:

https://github.com/GoogleCloudPlatform/dataflow-sample-applications/tree/master/timeseries-streaming.

--

--