32
Computing the Pearson correlation matrix on huge datasets in Python
One of the latest tasks at GoodIP was to calculate the similarities between around 480k items having around 800 observations in the range of 0–50k each. Reducing the dimensionality would compromise the quality of the long-tail results, which is undesirable. The following article evaluates the performance of different implementations, describes how to split the dataset into multiple chunks and process them in parallel. I also want to mention DeepGraph, which provides an interesting, different approach.
As a short refresher, the Pearson correlation assigns a value in the range of −1, signifying a negative correlation (if the value in A increases, the corresponding value in B decreases) and 1, a positive correlation (if A increases B also increases). A value of 0 means no correlation. It is defined as
where x̄, and ȳ are the means of values in x and y.
In our case, the CSV file containing the dataset is around 1GB in size. We are going to load it and create a NumPy ndarray from it.
items = pd.read_csv("./items.csv")
numpy_items = items.to_numpy()
Simply trying to run np.corrcoef(numpy_items) raises the exception MemoryError: Unable to allocate 1.6 TiB for an array with shape (480000, 480000) and data type float64 — which illustrates the dimension of the problem. The actual number of pairings that actually need to be processed is the number of different, unordered combinations — choosing r objects from a set of n objects. nCr(n,r)=n!/r!(n−r)! In this case nCr(480000,2), but still 115 199 760 000 combinations.
Let’s first compare the performance of different implementations from Pandas, NumPy, and CuPy and find out which works with parallelization. Therefore we only use a 1% subset of the data.
import numpy as np
import cupy as cp
import pandas as pd
from scipy.stats import pearsonr
cp.cuda.set_allocator(None) # Disable cache
items = items[0:5000]
numpy_items = items.to_numpy()
cupy_items = cp.array(numpy_items)
# Pandas implementation
%%timeit -n 1 -r 5
r1 = items.T.corr()
# 1 loop, best of 5: 37 s per loop
# NumPy implementation
%%timeit -n 1 -r 5
r2 = np.corrcoef(numpy_items)
# 1 loop, best of 5: 1.21 s per loop
# CuPy implementation (on Tesla K80)
%%timeit -n 1 -r 5
r3 = cp.corrcoef(cupy_items)
# 1 loop, best of 5: 66 ms per loop
# NumPy custom
%%timeit -n 1 -r 5
ms = numpy_profiles.mean(axis=1)[(slice(None,None,None),None)]
datam = numpy_profiles - ms
datass = np.sqrt(np.sum(datam*datam, axis=1))
r4 = []
for i in range(numpy_profiles.shape[0]):
temp = np.dot(datam[i:], datam[i].T)
rs = temp / (datass[i:] * datass[i])
r4.append(rs)
# 1 loop, best of 5: 44.7 s per loop
# CuPy custom version (on Tesla K80)
%%timeit -n 1 -r 5
ms = cupy_profiles.mean(axis=1)[(slice(None,None,None),None)]
datam = cupy_profiles - ms
datass = cp.sqrt(cp.sum(datam*datam, axis=1))
r5 = []
for i in range(cupy_profiles.shape[0]):
temp = cp.dot(datam[i:], datam[i].T)
rs = temp / (datass[i:] * datass[i])
r5.append(rs)
# 1 loop, best of 5: 2.35 s per loop
GPUs perform great with matrix operations, which could be the cause why CuPy outperforms the other implementations by far. This approach could also outperform our parallelized CPU approach and will be evaluated in a follow-up article. In the following, however, we use the custom implementation that utilizes NumPy, precomputed means, and can easily be distributed to different processes.
As we do not want to precompute all pairings, mainly due to memory constraints, the approach is to take one row, compute the correlation with all other rows below that index, increment the index, etc. This way, we exclude reflexive comparisons and use the commutativity of the function by ignoring the order. The computational effort for the first indices is then way higher than for the last, and we only want to split at full index positions, both taken into consideration with the following naive split. We need the nCr function for the number of total combinations.
import operator as op
def ncr(n: int, r: int) -> int:
"""
Calculates the number of different, unordered combination
of r objects from a set of n objects.
nCr(n,r) = nPr(n,r) / r!
Args:
n (int): Number of objects in set
r (int): Number of objects to combine
Returns:
int: Number of combinations
"""
r = min(r, n-r)
numer = reduce(op.mul, range(n, n-r, -1), 1)
denom = reduce(op.mul, range(1, r+1), 1)
return numer // denom
Since Python 3.8 you can also use math.comb. The following code finds the indices that split the index into n_chunks close to equal sized parts.
n_chunks = 50 # Number of chunks
n = numpy_items.shape[0] # Number items
nr_pairings = ncr(numpy_items.shape[0], 2) # Number of all pairings
# Minimum nr. of pairings per chunk
per_chunk = int(math.ceil(nr_pairings/(n_chunks - 1)))
split_indices = [] # Array containing the indices to split at
t = 0
for i in range(n + 1):
# There are n - i pairings at index i
t += n - i
# If the current chunk has enough pairings
if t >= per_chunk:
split_indices.append(i)
t = 0
We can check the number of pairings in each chunk running:
s_indices = [0] + split_indices + [n]
pairings_chunks = []
for i in range(len(s_indices)-1):
start = s_indices[i]
end = s_indices[i + 1]
pairings_chunks.append(sum(range(n - end, n - start)))
At first, we precompute the means of the dataset. As the data is read-only, we can just define it in the global scope. All the child processes will then be able to access it, and it will not be copied — provided you don’t write to it.
ms = numpy_profiles.mean(axis=1)[(slice(None,None,None),None)]
datam = numpy_profiles - ms
datass = np.sqrt(np.sum(datam*datam, axis=1))
The initial idea was to send the results from the workers to a dedicated storage worker through a queue. Unfortunately, this decreased the performance significantly due to blocking and serialization. The fastest option to implement was to write the results to disk directly from the worker.
output_dir = "/var/tmp/"
def processor_fun(*, i_start: int, i_end: int) -> None:
"""
Calculate correlation of rows in the range of i_start and i_end
and the rows below these indices.
Args:
i_start (int): Index of the start row
i_end (int): Index of the end row
Returns:
None
"""
for i in range(i_start, i_end):
temp = np.dot(datam[i:], datam[i].T)
rs = temp / (datass[i:] * datass[i])
rs = np.delete(rs, [0])
# Create nparray that contains information about the indices
res = np.zeros((rs.shape[0], 3))
res[:, 0] = i
res[:, 1] = list(range(i+1, i + rs.shape[0] + 1))
res[:, 2] = rs
# Save CSV file
np.savetxt(f'{output_dir}/{i}.csv', res,
delimiter=',',
newline='\n',
fmt=['%d','%d','%0.13f'])
# Create pool of worker processes which will carry out the
# computation
n_cpus = mp.cpu_count()
pool = mp.Pool(n_cpus)
# Start workers
s_indices = [0] + split_indices
workers = []
for i in range(0, len(s_indices)):
start = s_indices[i]
end = s_indices[i+1] if i < len(s_indices)-1 else n-1
workers.append(pool.apply_async(
processor_fun,
kwds={'i_start': start, 'i_end': end}))
for r in workers:
r.wait()
# Close the pool and wait till all workers are done
pool.close()
pool.terminate()
pool.join()
With our dataset, this creates 480k CSV files with a total size of approx. 4TB. We batched and gzip compressed the results to ~ 4GB parts and loaded them into Google BigQuery for production use.
32