Building a COVID-19 geosearch index using CSV files, MongoDB, or GraphQL

My kids recently went back to school, which means COVID-19 is again fresh in my mind. I thought it might be interesting to build a COVID-19 geosearch site to explore case counts using a map interface. The first step is selecting a data source and building an index. Creating a hosted index from your well-structured data is crucial to how Algolia returns fast and accurate search results. Creating an index can also be the first challenge for us as developers trying to get search up and running.

I want to build the index for my site using John Hopkins University's aggregated COVID-19 data set. It turns out there are several different ways we can consume this data set. This offers us an opportunity to explore several patterns for ingesting data from different types of back ends, including flat files, databases, and APIs. As developers, we typically don't choose where we get our data. Applications come with pre-existing data sources and defined interfaces. We may even need to interact with multiple data sources to build the type of search needed for our application.

By the way, Algolia continues to offer free Pro plans for not-for-profit COVID-19 related applications if you're looking to build (or have built) something similar yourself.

Getting started

At Algolia, we talk about data ingestion as a three-step indexing process:

  • Fetching data from our data source
  • Transforming the extracted data
  • Sending our records to an index

In the rest of this blog, we'll walk through some strategies to ingest data from each of the available data formats that JHU offers. To follow along, you'll want to create an index under your existing Algolia account or sign up for a free account and set up an application.

You'll find all of the ingest scripts we discuss below in the scripts directory of the COVID-19 geosearch demo repo. Feel free to clone it and test the finished demo. You'll need to create an .env file from the example.env and add your Algolia app information and API key before you run the scripts.

Fetching data from flat files

John Hopkins conveniently publishes their global and US data to a GitHub repo that is updated at least daily. The data itself is stored in CSV format. These files contain the geocode coordinates we need for our mapping front end.

For instance, here's the record for my home county here in Ohio:

39049,Franklin,Ohio,US,2021-08-31 04:21:46,39.96995815,-83.01115755,139178,1531,,,"Franklin, Ohio, US",10569.763874248532,1.0863785943180675

Data scientists find it appealing to use flat files in a repository for storing data because of the emphasis on version control and tracking changes over time. If better information becomes available, they can update the historical records while also capturing the fact that the record has changed. A high-order database may struggle tracking that change. We'll write a quick script to ingest the raw CSV data directly into our index. We can do this using the Python pandas library.

#!python3
import pandas

DATA_FILE = '../COVID-19/csse_covid_19_data/csse_covid_19_daily_reports/08-30-2021.csv'

def main():
  df = pandas.read_csv(DATA_FILE)
  print(df)

if __name__ == "__main__":
    main()

They've named each file with a datestamp, which means we'll eventually need a way to find the filename with the latest data. For now, we're just pinning the ingest to a particular day. The file itself has a lot of data, though. We'll want to trim it down to just the information we need for our search. Since we can't do that during the ingest step, we'll save that for the transformation step and move on to the next data source.

Fetching data from a database

The devs over at MongoDB have imported the JHU data set into a managed MongoDB document store. Consuming from a database instead of the raw files has a couple of advantages. First, they take care of ingesting the CSV files from the Github repo and ensure they update the data hourly. Living downstream of the document store means we don't have to do the work of sorting through the flat files and finding the latest data. Additionally, MongoDB gives us a managed endpoint and a known interface, reducing friction for our developers since we no longer have to deal with the bespoke data structure of the flat files.

We can connect to their document store directly using pymongo and use its metadata to retrieve the latest records.

#!python3
from pymongo import MongoClient

MDB_URL = "mongodb+srv://readonly:[email protected]/covid19"

def main():
  client = MongoClient(MDB_URL)
  db = client.get_database("covid19")
  stats = db.get_collection("global_and_us")
  metadata = db.get_collection("metadata")

  # Get the last date loaded:
  meta = metadata.find_one()
  last_date = meta["last_date"]

  results = stats.find(
    {
      "date":last_date,
    }
    print(results)

if __name__ == "__main__":
    main()

A crucial part of building our search index is trimming the data down to only what we need to search the data. Since we're using a database, we don't need to ingest the whole document, like with the flat files. Let's scope our query down to just the locality info, geocode coordinates, and confirmed case count. We can also skip any records missing positional data:

results = stats.find(
    {
      "date":last_date,
      "loc":{"$exists": True, "$ne": [] }
    }, {
      "combined_name": 1, 
      "county": 1,
      "state": 1,
      "country": 1,
      "confirmed": 1,
      "loc": 1
    }

This will make our job easier when we transform the data records later.

Fetching data from REST APIs

MongoDB also provides a RESTful API to retrieve the same data without having to connect to the document store. APIs have the advantage of needing little more than a secure HTTP connection to consume. The challenge comes from complex data requests that require intimate knowledge of the API design, as we'll see below.

First, let's build our REST request:

#!python3
import json
import requests

METADATA_URL = 'https://webhooks.mongodb-stitch.com/api/client/v2.0/app/covid-19-qppza/service/REST-API/incoming_webhook/metadata'
REST_URL = 'https://webhooks.mongodb-stitch.com/api/client/v2.0/app/covid-19-qppza/service/REST-API/incoming_webhook/global_and_us'

def main():
  meta = requests.get(METADATA_URL)
  last_date = meta.json()['last_date']
  query = {
    'min_date': last_date,
    'max_date': last_date,
    'hide_fields': '_id, fips, country_code, country_iso2, country_iso3, population, deaths, confirmed_daily, deaths_daily, recovered, recovered_daily'
  }
  response = requests.get(REST_URL, params=query)
  print(response.json())

if __name__ == "__main__":
    main()

Again, we use the metadata to make sure we're grabbing only the latest data. Notice that the REST API exposes a hide_fields parameter we can use as a negative filter to get just the fields we need. It’s not as straightforward as just listing the fields we want, but there might have been reasons for designing the API like that. With APIs, we must work with the interface we’re given. Fortunately, as we’ll see in the next section, this is a problem developers have been thinking about.

Fetching data from GraphQL APIs

The final interface we explore is an authenticated GraphQL API. GraphQL provides the same HTTPS-accessible interface as the REST API, but adds a normalized query language that removes the need for deep familiarity with individual APIs. Once again, we use the metadata to find the date of the latest data and retrieve only the fields we need. However, because this API is authenticated, we must first hit an authentication endpoint and grab a short-lived access token:

#!python3
import json
import requests

GRAPHQL_AUTH = "https://realm.mongodb.com/api/client/v2.0/app/covid-19-qppza/auth/providers/anon-user/login"
GRAPHQL_URL  = "https://realm.mongodb.com/api/client/v2.0/app/covid-19-qppza/graphql"

def main():
  response = requests.get(GRAPHQL_AUTH)
  access_token =  response.json()['access_token']

  headers = {}
  headers["Accept"] = "application/json"
  headers["Content-Type"] = "application/json"
  headers["Authorization"] = "Bearer {}".format(access_token)

  metadata = requests.post(GRAPHQL_URL, headers=headers, json={'query': 'query { metadatum{ last_date }}'})
  if metadata.status_code != 200:
    raise Exception(f"Query failed to run with a {response.status_code}.")
  last_date = metadata.json()['data']['metadatum']['last_date']

  query = '''query {
    global_and_us(query: { date: "''' + last_date + '''" }, limit:5000)
    { confirmed county state country combined_name loc { type coordinates }}
}'''

  response = requests.post(GRAPHQL_URL, headers=headers, json={'query': query})
  if response.status_code != 200:
    raise Exception(f"Query failed to run with a {response.status_code}.")
  print(response.json())

if __name__ == "__main__":
    main()

This interface doesn't bring much new to the table. The one thing it has over the REST API is that it's standardized. GraphQL really shines when we query across multiple, disparate back ends or thinly slice our data for mobile or low-bandwidth use cases. It may be interesting to bring in additional data after a search, but we can do that with the other interfaces with less overhead.

Choosing our data source

There are pros and cons to each of these ingest methods. Consuming the flat files directly gets us closest to the data and includes some global municipality data that's not in the downstream sources. However, consuming these files means additional code to figure out which file has the latest data. We'll also need to do our own data normalization.

Consuming the files via MongoDB provides a simple interface and builds on top of a managed data ingestion layer to the flat files, but at the cost of some fidelity in the global data. For this use case, the REST and GraphQL APIs don't provide much beyond what we get from consuming the upstream sources. All three (MongoDB and the two API methods) provide query interfaces to filter records and retrieve only the attributes we need for our index.

As we think about keeping the data in sync over time, we may find the MongoDB interface appealing because we can use the metadata to ensure we're always grabbing the most recent case counts. As much as I love the additional data we get by consuming the raw CSV files, we'd have to build a way to monitor the repo to ensure we're always getting the most recent data.

Transforming our data

After selecting our data source, our next task is shaping the data for our index. Algolia indices must be in JSON format with a unique objectID per record. It makes sense to map these IDs to an existing ID in our data source. The Algolia API can create these IDs for us, but we don't recommend this. Beyond these requirements, the goal here is to trim our records down to the simplest form that still provides the right balance between performant search and useful results.

We are creating a geographic representation of our COVID-19 data, so at a bare minimum, we will need the geocode coordinates and the confirmed cases count for each record we want to display. To make the data easier to consume, we should include locality information like county or province to display on the map, and states and countries to improve the search relevance.

Here's our transform code for the CSV flat-file ingestion, again using pandas:

covid_records = []
  for index, row in df.iterrows():
    # Skip locations w/o coordinates
    if pandas.isna(row['Lat']):
      print('Skipping {}: No geocode'.format(row['Combined_Key']))
    else:
      covid_record = {}
      covid_geocode = {}
      print(row['Combined_Key'])
      covid_record['`objectId`'] = row['Combined_Key']
      # Let's not use the combined key for US counties, instead let's use county and state  
      if pandas.isna(row['Admin2']):
        covid_record['location'] = row['Combined_Key']
      else:
        covid_record['location'] = row['Admin2'] + ', ' + row['Province_State']
      covid_record['country'] = row['Country_Region']
      covid_record['confirmed_cases'] = int(row['Confirmed'])
      covid_geocode['lat'] = row['Lat']
      covid_geocode['lng'] = row['Long_']
      covid_record['_geoloc'] = covid_geocode
      covid_records.append(covid_record)

The ingest code remains fairly consistent across all the ingest patterns. That makes sense since they all inherit the shape of the CSV files that are the root source. Here's the same code for our MongoDB source:

covid_records = []
  for row in response.json():
    # Unassigned and Unknown records are alread scrubbed in this DB
    # Skip 'US' and 'Canada' since they have incomplete data
    # and locations w/o coordinates
    if row['combined_name'] != 'US' and row['combined_name'] != 'Canada' and 'loc' in row:
      covid_record = {}
      covid_geocode = {}
      print(row['combined_name'])
      covid_record['`objectId`'] = row['combined_name']
      # Let's not use the combined key for US counties, instead let's use county and state  
      if 'county' in row:
        covid_record['location'] = row['county'] + ', ' + row['state']
      else:
        covid_record['location'] = row['combined_name']
      covid_record['country'] = row['country']
      covid_record['confirmed_cases'] = row['confirmed']
      covid_geocode['lat'] = row['loc']['coordinates'][1]
      covid_geocode['lng'] = row['loc']['coordinates'][0]
      covid_record['_geoloc'] = covid_geocode
      covid_records.append(covid_record)
    else:
      print('Skipping {}: No geocode'.format(row['combined_name']))

In both cases, we are left with a normalized Python dictionary with the attributes we need for search and discovery.

Building our index

The last step is to send our normalized data to the Algolia Search API. At this point, the code is boilerplate using the Python SDK:

client = SearchClient.create(os.getenv('APP_ID'), os.getenv('API_KEY'))
  index = client.init_index(os.getenv('ALGOLIA_INDEX_NAME'))
  index.clear_objects()
  index.save_objects(covid_records)

This code clears out any existing records and rebuilds the index using the latest data. By using the Python client to interact with the Algolia API, we don't have to worry about things like record batching and API retries, reducing the amount of code needed here. It also ensures zero downtime.

Most of the index defaults work just fine for this use case. The only changes I'd like to make are to limit our searchable attributes to just country and location, and rank our results in descending order by the number of cases. High case counts tend to map to large population centers, which makes the data more useful when we zoom out on the map.

Here's the snippet from my index configuration:

{
...
  "searchableAttributes": ["unordered(country)", "unordered(location)"],
  "ranking": ["typo", "geo", "words", "filters", "proximity", "attribute", "exact", "custom"],
  "customRanking": ["desc(confirmed_cases)"],
...
}

Next steps

After the initial data ingest, we also need to decide how to keep our index up to date as the underlying data changes. There are a number of patterns we can follow here depending on how we initiate the update and modify the index.

Rebuilding our index

Since our COVID-19 data set has a fixed number of records, clearing and rebuilding the index every few hours or a few times a day isn't particularly heavy. We can do this at fixed intervals using a scheduled serverless function or embedding our script in our build process. If we wanted even closer to real-time updates, we could set up a GitHub action on the COVID-19 repo to trigger a rebuild of our index whenever we merge a new PR.

Atomic rebuilds for production data

For a larger production index, we'll want to avoid the downtime as an index rebuilds or the possibility of a failed update. For this reason, we'll use the replace all objects method to perform an atomic reindex. Rather than rebuilding the index in place, this method creates a new index and only cuts over after all records are saved, reducing downtime and ensuring the action is, in fact, atomic.

Incremental updates

Sometimes it makes sense to perform incremental updates to individual records or even individual fields of data rather than rebuilding the entire index. For instance, if we were tracking confirmed cases over time, we would need to take into account occasional updates to historical data. It's not necessary to rebuild the entire index for a small data change. It makes more sense to monitor for changes using webhook subscriptions or by monitoring the files directly. Then we can incrementally add or update records in batches as data changes. In this pattern, the unique objectId becomes critical to reference the existing records during updates.

Building a front end

Now that we have our index, we can focus on building our front end. The geosearch widget from the Algolia React Instantsearch library lets us build a front end fairly trivially using Google Maps. You can take a sneak peek n the repo, but we'll save implementing that for another day.

24