How to build a crypto asset database for quant trading (with MongoDB and Python) - PART I

July 26, 2022



Data is the foundation for every quant-based strategy. Before we can backtest trading strategies or perform any type of quantitative analysis, we need to acquire and store high-quality data. With open-source software and free APIs, we can develop infrastructure that is built to scale.

What you’ll learn

The goal of this tutorial is to create a financial master database that can accommodate any kind of data to be used for quantitative analysis. In the end, we will have a scalable MongoDB database containing a wide variety of historical data pertaining to the analysis and prediction of cryptocurrency markets. We will be able to easily store and query data with Python and use pandas for our analysis. In the later part of this series, we’re going to build a dual momentum strategy (time series and cross-sectional momentum) based on this data.

In this first part of the tutorial, we’re creating a script to download all coins listed on Coingecko along with stats like total supply, current supply and a few other metrics Coingecko provides. The data will then be stored in MongoDB and queried through pymongo. In the second part of the tutorial, we’ll get the entire set of historical data, such as prices, daily volume, market capitalization and fully diluted valuations for all these coins.

Requirements

- Python and Jupyter Notebook installed

Set up MongoDB

After trying several solutions and systems to store financial data (pystore, folders with csv’s, hdf5, etc.), I have found that MongoDB offered the most versatile solution for my needs. Not only is it incredibly fast and storage-efficient, but it’s also being used widely in financial industry and at big investment managers such as Man AHL. It provides a great UI with a standalone app called MongoDB Compass, and comes with powerful query operations that we’ll utilize later on to streamline our analysis.

Let’s start by installing a local instance of MongoDB:

1. Install MongoDB

The below instructions are for macOS. For other operating systems, check out the docs here: https://www.mongodb.com/docs/manual/administration/install-community/

# Prerequisite: XCode
xcode-select --install

# Prerequisite: Package manager homebrew
/bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/HEAD/install.sh)"

# Download homebrew formula for MongoDB
brew tap mongodb/brew

# Update homebrew
brew update

# Install MongoDB
brew services start mongodb-community@5.0

# Run MongoDB 
brew services start mongodb-community@5.0

2. Connect Python to MongoDB - pymongo

PyMongo is a python package to interact with MongoDB. Open a terminal and install pymongo with: pip install pymongo

Now that we have everything set up, we can switch over to Jupyter notebook. I’ve recently started to use Visual Studio Code as my Jupyter notebook environment instead of using the browser-based version and can highly recommend it.

Let’s import pymongo and create a MongoDB client instance. By default, MongoDB is running on localhost port 27017. We’ll also need to import pandas, pycoingecko and some other packages mentioned in the import statement below (pip install [package-name] if not installed already):

from pycoingecko import CoinGeckoAPI
from datetime import datetime, timezone
import pandas as pd
import time
import pymongo

client = pymongo.MongoClient("mongodb://localhost:27017")

# load Coingecko API
cg = CoinGeckoAPI()

3. Create MongoDB collection and index

We then create a database called “coingecko_data” and store it as db. MongoDB stores data records gathered together in so-called collections. Collections are essentially folders for your documents. We’re now creating a new collection called coins, which we will use to store descriptive data, such as a coin’s ID, current market cap or total supply.

db = client["coingecko_data"]
coins = db["coins"]

In the coins collection, we create a MongoDB index, so that we don’t end up with duplicate records. Whenever we try to write data to the collection, it has to be unique on the combination of the keys id and last_update, otherwise, MongoDB throws an error. With the parameters 1 and -1 We’re also sorting ascending and descending on id and last_updated, respectively.

coins.create_index([('id', 1),("last_updated", -1)], unique=True)

4. Download Descriptive Data from Coingecko

The get_coins_list() method fetches information on all coins currently listed on Coingecko.

data_coins = cg.get_coins_list()
data_coins = pd.DataFrame(data_coins)
data_coins

This will return the below dataframe. The id is the coingecko-id, a unique identifier for each coin that is frequently referenced by other crypto APIs as well.

Screenshot 2022-07-04 at 19.05.00.png

We now have all the id’s, but we also want some more stats, such as current circulating or total supply for each coin. We’re going to loop through all coin id’s and query the Coingecko API for each one. Coingecko’s free tier has a variable rate limit of around 10-50 API calls per minute, so we need to be sure not to exceed this.

We wrap the API call in a for loop and usetry except to catch errors if data couldn’t be retrieved from coingecko. Usually, the error is due to the rate limit, thus we will wait 30 seconds and then retry the API call again.

for i in range(0, len(data_coins["id"])):
    
    # read coin id from data_coins dataframe
    coin_id = data_coins("id")[i]
    
    # trying max 5 times to connect to coingecko API
    for attempt in range(5):
    
        try: 
            data = cg.get_coin_by_id(id = coin_id, vs_currency = "usd")
            print(f"✅ retrieved data for {coin_id}")

        except Exception as e:
            print(f"❌ error retrieving data from coingecko for {coin_id}: {e}. Waiting 30s to try again")

            time.sleep(30)

        else:
            break

We continue in the same for loop and write all the retrieved data to our data_coins dataframe.

try: 
        data_coins.loc[data_coins["id"]==coin_id, "last_updated"] = data["last_updated"]
        data_coins.loc[data_coins["id"]==coin_id, "total_supply"] = data["market_data"]["total_supply"]
        data_coins.loc[data_coins["id"]==coin_id, "max_supply"] = data["market_data"]["max_supply"]
        data_coins.loc[data_coins["id"]==coin_id, "circulating_supply"] = data["market_data"]["circulating_supply"]
        data_coins.loc[data_coins["id"]==coin_id, "fully_diluted_valuation"] = pd.Series(data["market_data"]["fully_diluted_valuation"],dtype=float)
        data_coins.loc[data_coins["id"]==coin_id, "market_cap"] = data["market_data"]["market_cap"]["usd"]
        data_coins.loc[data_coins["id"]==coin_id, "coingecko_rank"] = data["coingecko_rank"]
        data_coins.loc[data_coins["id"]==coin_id, "coingecko_score"] = data["coingecko_score"]
        data_coins.loc[data_coins["id"]==coin_id, "developer_score"] = data["developer_score"]
        data_coins.loc[data_coins["id"]==coin_id, "community_score"] = data["community_score"]
        data_coins.loc[data_coins["id"]==coin_id, "liquidity_score"] = data["liquidity_score"]
        data_coins.loc[data_coins["id"]==coin_id, "public_interest_score"] = data["public_interest_score"]

    except Exception as e:
        print(f"❌ error writing data for {coin_id}: {e}")
        pass

Now that all the data is in the data_coins dataframe, we’re storing it in our MongoDB. There are a few things going on at once in this statement. First, we use pymongo’sinsert_many function to insert many documents at once. We then select only the data for the current coin and convert it to json format with the .to_dict("records") method.

try:
        coins.insert_many(data_coins.loc[data_coins["id"] == coin_id].to_dict("records"), ordered = False)

        print(f"✅ Successfully stored data for {coin_id} in MongoDB.")

Pymongo will throw an error with code 11000 when trying to import duplicate documents. We catch this and print a warning statement. If the error is anything due to any other reason, we need to inspect the error more closely, thus printing the full error message as well.

 except pymongo.errors.BulkWriteError as e:

        # Throw an error for everything that isn't a duplicate error (code 11000)
        panic_list = list(filter(lambda x: x['code'] != 11000, e.details['writeErrors']))
        if len(panic_list) > 0:
            print(f"❌ these are not duplicate errors {panic_list}")
            
        else: print(f"❗ This data for {coin_id} already exists in MongoDB")
        pass

Finally, the loop for two seconds, to slow down the API requests.

time.sleep(2)

Running the script will take several hours. It always prints out a message on the coin it’s processing and if any errors occurred. As I’ve already run the script before, the output below shows some warnings for duplicate documents.

Screenshot 2022-07-26 at 12.48.55.png

Conclusion

We can now check our database on MongoDB Compass. If you’ve followed the standard installation procedure, you can connect to your localhost instance with this URI: mongodb://localhost:27017

Screenshot 2022-07-22 at 19.06.24.png

After that, you’ll see a document for each coin id and its corresponding data.

Screenshot 2022-07-22 at 19.09.31.png

In the second part of the tutorial, we’ll get the entire set of historical data, such as prices, daily volume, market capitalization and fully diluted valuations for all these coins. Sign up below to get notified when it will be released.

Full script

for i in range(0, len(data_coins["id"])):
    
    # read coin id from data_coins dataframe
    coin_id = data_coins("id")[i]
    
    # trying max 5 times to connect to coingecko API
    for attempt in range(5):
    
        try: 
            data = cg.get_coin_by_id(id = coin_id, vs_currency = "usd")
            print(f"✅ retrieved data for {coin_id}")

        except Exception as e:
            print(f"❌ error retrieving data from coingecko for {coin_id}: {e}. Waiting 30s to try again")

            time.sleep(30)

        else:
            break

    try: 
        data_coins.loc[data_coins["id"]==coin_id, "last_updated"] = data["last_updated"]
        data_coins.loc[data_coins["id"]==coin_id, "total_supply"] = data["market_data"]["total_supply"]
        data_coins.loc[data_coins["id"]==coin_id, "max_supply"] = data["market_data"]["max_supply"]
        data_coins.loc[data_coins["id"]==coin_id, "circulating_supply"] = data["market_data"]["circulating_supply"]
        data_coins.loc[data_coins["id"]==coin_id, "fully_diluted_valuation"] = pd.Series(data["market_data"]["fully_diluted_valuation"],dtype=float)
        data_coins.loc[data_coins["id"]==coin_id, "market_cap"] = data["market_data"]["market_cap"]["usd"]
        data_coins.loc[data_coins["id"]==coin_id, "coingecko_rank"] = data["coingecko_rank"]
        data_coins.loc[data_coins["id"]==coin_id, "coingecko_score"] = data["coingecko_score"]
        data_coins.loc[data_coins["id"]==coin_id, "developer_score"] = data["developer_score"]
        data_coins.loc[data_coins["id"]==coin_id, "community_score"] = data["community_score"]
        data_coins.loc[data_coins["id"]==coin_id, "liquidity_score"] = data["liquidity_score"]
        data_coins.loc[data_coins["id"]==coin_id, "public_interest_score"] = data["public_interest_score"]

    except Exception as e:
        print(f"❌ error writing data for {coin_id}: {e}")
        pass

    try:
        coins.insert_many(data_coins.loc[data_coins["id"] == coin_id].to_dict("records"), ordered = False)

        print(f"✅ Successfully stored data for {coin_id} in MongoDB.")

    except pymongo.errors.BulkWriteError as e:

        # Throw an error for everything that isn't a duplicate error (code 11000)
        panic_list = list(filter(lambda x: x['code'] != 11000, e.details['writeErrors']))
        if len(panic_list) > 0:
            print(f"❌ these are not duplicate errors {panic_list}")
            
        else: print(f"❗ This data for {coin_id} already exists in MongoDB")
        pass
    
    time.sleep(2)


ConvertKitForm