Lecture 14: The Need for Speed

You will learn how to time your code and locate its bottlenecks. You will learn how to alleviate such bottlenecks using techniques such as comprehensions, generators, vectorization and parallelization. You will be introduced to how to use the Numba library to speed-up your code. You will hear about the fundamental computational costs of mathematical operations and memory management (caching).

[ ]
import time
import numpy as np
import pandas as pd
from scipy import optimize

%matplotlib inline
import matplotlib.pyplot as plt
plt.style.use('seaborn-whitegrid')

%load_ext autoreload
%autoreload 2

# performance libraries
import numba as nb
import joblib # conda install joblib
import dask # conda install dask
import dask.dataframe as dd

# magics
#  conda install line_profiler
#  conda install memory_profiler

%load_ext line_profiler
%load_ext memory_profiler

# local module
import needforspeed
[ ]
import psutil
CPUs = psutil.cpu_count()
CPUs_list = set(np.sort([1,2,4,*np.arange(8,CPUs+1,4)])) 
print(f'this computer has {CPUs} CPUs')

1. Computers

We can represent a computer in a simplified diagram as:

computer

Performance goals:

  1. Minimize the number of logical and algebraic operations (details)
  2. Minimize the number of times new memory needs to be allocated (and the amount)
  3. Minimize the number of read and write memory (and especially storage) operations

Optimizing your code for optimal performance is a very very complicated task. When using Python a lot of stuff is happening under the hood, which you don't control.

  • Python is an interpreted language; each line of Python code is converted into machine code at runtime when the line is reached. Error checks and memory management are performed automatically.
  • Faster languages (C/C++, Fortran) are compiled to machine code before the program is run →\rightarrow faster, but you are required to specify e.g. types of variables beforehand. Error checks and memory management must be performed manually.

Often overlooked, todays CPUs are so fast that feeding them data quickly enough can be a serious bottleneck.

Modern CPUs can do a lot of smart, complicated, stuff.

Single-instruction multiply data (SIMD): The computional cost of multiplying one float with another is the same as multiplying e.g. vectors of 4 doubles at once (or 8 doubles if you have AVX-512).

Out-of-order execution: If you tell the computer to

  1. read data X
  2. run f(X)
  3. read data Y
  4. run g(Y)

then it might try to do step 2 and step 3 simultanously because they use different parts of the CPU.

Caching: Let x be a one-dimensional numpy array, and assume you read the value in x[i] and then read the value in x[j]. If j is "close" to i then the value of x[j] will already be in the cache and the second read operation will be faster (almost instantanous).

Parallization: Modern computers have multiple CPUs (or even other computing units such as GPUs). This is to some degree used implicitely by e.g. built-in Numpy and Scipy functions, but we also discuss how to do this manually later in this lecture. The clock speed of each CPU has stopped increasing for technical reasons, the number of transistors on each chip continue to increase exponentially (Moore's Law) due to more CPUs.

moores_law

Memory: We have many different kinds of memory

  1. Cache
  2. RAM (Random Access Memory)
  3. Hard drive

We control what is in the RAM and on the the hard drive; the latter is a lot slower than the former. The cache is used by the computer under the hood.

memory

Three important principles:

  1. Use built-in features of Python, Numpy, Scipy etc. whenever possible (often use fast compiled code).
  2. Ordered operations is better than random operations.
  3. "Premature optimization is the root of all evil" (Donald Knuth).

There is a trade-off between human time (the time it takes to write the code) and computer time (the time it takes to run the code).

2. Timing and precomputations

Consider the following function doing some simple algebraic operations:

[ ]
def myfun(x,i):
    y = 0
    for j in range(100):
        y += x**j
    return y + i

And another function calling the former function in a loop:

[ ]
def myfun_loop(n):
    mysum = 0
    for i in range(n):
        mysum += myfun(5,i)
    return mysum

How long does it take to run myfun_loop:

A. Manual timing

[ ]
t0 = time.time()
mysum = myfun_loop(1000)
t1 = time.time()    
print(f'{t1-t0:.8} seconds')

B. Use the %time magic (work on a single line)

[ ]
%time mysum = myfun_loop(1000)
%time mysum = myfun_loop(1000)

ms ≡\equiv milliseconds, 10−310^{-3} of a second.
μ\mus ≡\equiv mikroseconds, 10−610^{-6} of a second.
ns ≡\equiv nanoseconds, 10−910^{-9} of a second.

C. Use the %timeit magic to also see variability (work on single line)

[ ]
%timeit myfun_loop(1000)
%timeit -r 5 -n 20 myfun_loop(1000)

%timeit report the best of r runs each calling the code n times in a loop

D. Use the %%time magic (work on a whole cell)

[ ]
%%time
n = 1000
myfun_loop(n);

E. Use the %%timeit magic to also see variabilty (work on a whole cell)

[ ]
%%timeit
n = 1000
myfun_loop(n)

Question: How can we speed up the computation using precomputation?

[ ]
def myfun_loop_fast(n):
    myfunx = myfun(5,0)
    mysum = 0
    for i in range(n):
        mysum = myfunx+i
    return mysum
    
# remember
def myfun_loop(n):
    mysum = 0
    for i in range(n):
        mysum += myfun(5,i)
    return mysum

def myfun(x,i):
    y = 0
    for j in range(100):
        y += x**j
    return y + i

Answer:

[ ]
def myfun_loop_fast(n):
    myfunx = myfun(5,0) # precomputation
    mysum = 0
    for i in range(n):
        mysum += myfunx + i
    return mysum
[ ]
t0 = time.time()
mysum_fast = myfun_loop_fast(1000)
t1 = time.time()    
print(f'{t1-t0:.8f} seconds')

Too fast to be measured with time.time(). The %timeit magic still works:

[ ]
%timeit myfun_loop(1000)
%timeit myfun_loop_fast(1000)

→\rightarrow orders of magnitude faster!

Check the results are the same:

[ ]
assert mysum == mysum_fast

2.1 Premature optimization is the root of all evil

Important: Before deciding whether to do a precomputation (which often makes the code harder to read) we should investigate, whether it alleviates a bottleneck.

  • A. Insert multiple time.time() to time different parts of the code.
  • B. Use the line_profiler with syntax (also works with methods for classes)

    %lprun -f FUNCTION_TO_PROFILE -f FUNCTION_TO_PROFILE FUNCTION_TO_RUN

Baseline method:

[ ]
%lprun -f myfun -f myfun_loop myfun_loop(1000)

Observation: Most of the time is spend in myfun(), more specifically the computation of the power in line 4. The precomputation solves this problem.

Compare with the fast method:

[ ]
%lprun -f myfun_loop_fast myfun_loop_fast(1000)

3. List comprehensions are your friend

We can find the first nn squares using a loop:

[ ]
def squares(n):
    result = []
    for i in range(n):
        result.append(i*i)
    return result

Or in a list comprehension:

[ ]
def squares_comprehension(n):
    return [i*i for i in range(n)]

They give the same result:

[ ]
n = 1000
mylist = squares(n)
mylist_fast = squares_comprehension(n)
assert mylist == mylist_fast

But the list comphrension is faster:

[ ]
%timeit mylist = squares(n)
%timeit mylist_fast = squares_comprehension(n)

Question: Why is this slower?

[ ]
%timeit [i**2 for i in range(1,n+1)]

3.1 Generators

Assume you are only interested in the sum of the squares. Can be calculated as follows:

[ ]
squares_list = [i*i for i in range(n)]
mysum = 0
for square in squares_list:
    mysum += square

Problem: In line 1 we create the full list even though we only need one element at a time
→\rightarrow we allocate memory we need not allocate.

Solution: Can be avoided with a generator.

[ ]
squares_generator = (i*i for i in range(n)) # notice: parentheses instead of brackets
mysum_gen = 0
for square in squares_generator:
    mysum_gen += square

assert mysum == mysum_gen

The memory footprint can be investigated with the memory_profiler with syntax

%mprun -f FUNCTION_TO_PROFILE -f FUNCTION_TO_PROFILE FUNCTION_TO_RUN

Caveat: Needs to be a function in an external module.

[ ]
%mprun -f needforspeed.test_memory needforspeed.test_memory(10**6)

MiB 1 MiB = 1.048576 MB

Numpy: Note how you can save memory by specifying the data type for the numpy array.

Alternative: Generators can also be created as functions with a yield instead of a return

[ ]
def f_func(n):
    for i in range(n):
        yield i*i

squares_generator = f_func(n)
mysum_gen = 0
for square in squares_generator:
    mysum_gen += square

assert mysum == mysum_gen

3.2 Details on generators (+)

As everything else in Python a generator is just a special kind of class:

[ ]
class f_class():
    
    def __init__(self,n):    
        self.i = 0
        self.n = n
    
    def __iter__(self):
        # print('calling __iter__')
        return self
    
    def __next__(self):
        # print('calling __iter__')
        if self.i < self.n:
            cur = self.i*self.i
            self.i += 1
            return cur
        else:
            raise StopIteration()
    
squares_generator = f_class(n)
mysum_gen = 0
for square in squares_generator:
    mysum_gen += square

assert mysum == mysum_gen

Note: for x in vec first calls iter on vec and then next repeatly.

[ ]
squares_generator = iter(f_class(n))
print(next(squares_generator))
print(next(squares_generator))
print(next(squares_generator))
print(next(squares_generator))

Illustrative example:

[ ]
def g():
    print('first run')
    yield 1
    print('running again')
    yield 9
    print('running again again')
    yield 4
    
mygen = iter(g())
print(next(mygen))
print(next(mygen))
print(next(mygen))
try:
    print(next(mygen))
except:
    print('no more values to yield')
[ ]
for x in g():
    print(x)

4. Optimizing Numpy

4.1 Tip 1: Always use vectorized operations when available

Simple comparison:

[ ]
x = np.random.uniform(size=500000)

def python_add(x):
    y = []
    for xi in x:
        y.append(xi+1)
    return y

def numpy_add(x):
    y = np.empty(x.size)
    for i in range(x.size):
        y[i] = x[i]+1
    return y

def numpy_add_vec(x):
    return x+1

assert np.allclose(python_add(x),numpy_add(x))
assert np.allclose(python_add(x),numpy_add_vec(x))

%timeit python_add(x)
%timeit numpy_add(x)
%timeit numpy_add_vec(x)

Even stronger when the computation is more complicated:

[ ]
def python_exp(x):
    y = []
    for xi in x:
        y.append(np.exp(xi))
    return y

def numpy_exp(x):
    y = np.empty(x.size)
    for i in range(x.size):
        y[i] = np.exp(x[i])
    return y

def numpy_exp_vec(x):
    return np.exp(x)

assert np.allclose(python_exp(x),numpy_exp(x))
assert np.allclose(python_exp(x),numpy_exp_vec(x))

%timeit python_exp(x)
%timeit numpy_exp(x)
%timeit numpy_exp_vec(x)

Also works for a conditional sum:

[ ]
def python_exp_cond(x):
    return [np.exp(xi) for xi in x if xi < 0.5]

def numpy_exp_vec_cond(x):
    y = np.exp(x[x < 0.5])
    return y

def numpy_exp_vec_cond_alt(x):
    y = np.exp(x)[x < 0.5]
    return y

assert np.allclose(python_exp_cond(x),numpy_exp_vec_cond(x))
assert np.allclose(python_exp_cond(x),numpy_exp_vec_cond_alt(x))

%timeit python_exp_cond(x)
%timeit numpy_exp_vec_cond(x)
%timeit numpy_exp_vec_cond_alt(x)

Question: Why do you think the speed-up is less pronounced in this case?

4.2 Tip 2: Operations are faster on rows than on columns

Generally, operate on the outermost index.

[ ]
n = 1000
x = np.random.uniform(size=(n,n))

def add_rowsums(x):
    mysum = 0
    for i in range(x.shape[0]):
        mysum += np.sum(np.exp(x[i,:]))
    return mysum
            
def add_colsums(x):
    mysum = 0
    for j in range(x.shape[1]):
        mysum += np.sum(np.exp(x[:,j]))
    return mysum

assert np.allclose(add_rowsums(x),add_colsums(x))
            
%timeit add_rowsums(x)
%timeit add_colsums(x)
amdahls_law

The memory structure can be changed manually so that working on columns (innermost index) is better than working on rows (outermost index):

[ ]
y = np.array(x,order='F') # the default is order='C'
%timeit add_rowsums(y)
%timeit add_colsums(y)

4.3 Tip 3: Also use vectorized operations when it is a bit cumbersome

Consider the task of calculating the following expected value:

W(a)=E[aψ+ξ]ψ,ξ∈{0.25with prob. 0.250.5with prob. 0.251.5with prob. 0.251.75with prob. 0.25\begin{aligned} W(a)&=\mathbb{E}\left[\sqrt{\frac{a}{\psi}+\xi}\right]\\ \psi,\xi&\in \begin{cases} 0.25 & \text{with prob. }0.25\\ 0.5 & \text{with prob. }0.25\\ 1.5 & \text{with prob. }0.25\\ 1.75 & \text{with prob. }0.25 \end{cases}\end{aligned}

for a vector of aa-values.

Setup:

[ ]
N = 5000
a_vec = np.linspace(0,10,N)

xi_vec = np.array([0.25,0.5,1.5,1.75])
psi_vec = np.array([0.25,0.5,1.5,1.75])

xi_w_vec = np.ones(4)/4
psi_w_vec = np.ones(4)/4

Loop based solution:

[ ]
def loop(a_vec,xi_vec,psi_vec,xi_w_vec,psi_w_vec):
    
    w_vec = np.zeros(a_vec.size)
    for i,a in enumerate(a_vec):        
        for xi,xi_w in zip(xi_vec,xi_w_vec):
            for psi,psi_w in zip(psi_vec,psi_w_vec):
                m_plus = a/psi + xi
                v_plus = np.sqrt(m_plus)
                w_vec[i] += xi_w*psi_w*v_plus
    
    return w_vec
        
loop_result = loop(a_vec,xi_vec,psi_vec,xi_w_vec,psi_w_vec)  
%timeit loop(a_vec,xi_vec,psi_vec,xi_w_vec,psi_w_vec)      

Prepare vectorized solution:

[ ]
def prep_vec(a_vec,xi_vec,psi_vec,xi_w_vec,psi_w_vec):
    
    # a. make a (1,N) instead of (N,)
    a = a_vec.reshape((1,N))
    
    # b. make xi and psi to be (xi.size*psi.size,1) vectors
    xi,psi = np.meshgrid(xi_vec,psi_vec)     
    xi = xi.reshape((xi.size,1))
    psi = psi.reshape((psi.size,1))
    
    # c. make xi and psi to be (xi.size*psi.size,1) vectors    
    xi_w,psi_w = np.meshgrid(xi_w_vec,psi_w_vec)     
    xi_w = xi_w.reshape((xi_w.size,1))
    psi_w = psi_w.reshape((psi_w.size,1))
    
    return a,xi,psi,xi_w,psi_w

a,xi,psi,xi_w,psi_w = prep_vec(a_vec,xi_vec,psi_vec,xi_w_vec,psi_w_vec)
%timeit prep_vec(a,xi,psi_vec,xi_w_vec,psi_w_vec)

Apply vectorized solution:

[ ]
def vec(a,xi,psi,xi_w,psi_w):   
    m_plus_vec = a/psi + xi # use broadcasting, m_plus_vec.shape = (xi.size*psi.size,N)
    v_plus_vec = np.sqrt(m_plus_vec) # vectorized funciton call
    w_mat = xi_w*psi_w*v_plus_vec
    w_vec = np.sum(w_mat,axis=0) # sum over rows
    return w_vec

vec_result = vec(a,psi,xi,xi_w,psi_w)
assert np.allclose(loop_result,vec_result)
%timeit vec(a,psi,xi,xi_w,psi_w)

Conclusion: Much much faster.

Apply vectorized solution without preperation:

[ ]
def vec(a,xi,psi,xi_w,psi_w):   
    m_plus_vec = a[:,np.newaxis,np.newaxis]/psi[np.newaxis,:,np.newaxis] + xi[np.newaxis,np.newaxis,:]
    v_plus_vec = np.sqrt(m_plus_vec)
    w_mat = xi_w[np.newaxis,np.newaxis,:]*psi_w[np.newaxis,:,np.newaxis]*v_plus_vec
    w_vec = np.sum(w_mat,axis=(1,2))
    return w_vec

vec_result_noprep = vec(a_vec,psi_vec,xi_vec,xi_w_vec,psi_w_vec)
assert np.allclose(loop_result,vec_result_noprep)
%timeit vec(a_vec,psi_vec,xi_vec,xi_w_vec,psi_w_vec)

5. Numba

Writing vectorized code can be cumbersome, and in some cases it is impossible. Instead we can use the numba module.

Adding the decorator nb.njit on top of a function tells numba to compile this function to machine code just-in-time. This takes some time when the function is called the first time, but subsequent calls are then a lot faster. The input types can, however, not change between calls because numba infer them on the first call.

[ ]
def myfun_numpy_vec(x1,x2):
    y = np.empty((1,x1.size))
    I = x1 < 0.5
    y[I] = np.sum(np.exp(x2*x1[I]),axis=0)
    y[~I] = np.sum(np.log(x2*x1[~I]),axis=0)
    return y

# setup
x1 = np.random.uniform(size=10**6)
x2 = np.random.uniform(size=np.int(100*CPUs/8)) # adjust the size of the problem
x1_np = x1.reshape((1,x1.size))
x2_np = x2.reshape((x2.size,1))

# timing
%timeit myfun_numpy_vec(x1_np,x2_np)

Numba: The first call is slower, but the result is the same, and the subsequent calls are faster:

[ ]
@nb.njit
def myfun_numba(x1,x2):
    y = np.empty(x1.size)
    for i in range(x1.size):
        if x1[i] < 0.5:
            y[i] = np.sum(np.exp(x2*x1[i]))
        else:
            y[i] = np.sum(np.log(x2*x1[i]))
    return y

# call to just-in-time compile
%time myfun_numba(x1,x2)

# actual measurement
%timeit myfun_numba(x1,x2)

assert np.allclose(myfun_numpy_vec(x1_np,x2_np),myfun_numba(x1,x2))

Further speed up: Use

  1. parallelization (with prange), and
  2. faster but less precise math (with fastmath)
[ ]
@nb.njit(parallel=True)
def myfun_numba_par(x1,x2):
    y = np.empty(x1.size)
    for i in nb.prange(x1.size): # in parallel across threads
        if x1[i] < 0.5:
            y[i] = np.sum(np.exp(x2*x1[i]))
        else:
            y[i] = np.sum(np.log(x2*x1[i]))
    return y

assert np.allclose(myfun_numpy_vec(x1_np,x2_np),myfun_numba_par(x1,x2))
%timeit myfun_numba_par(x1,x2)
[ ]
@nb.njit(parallel=True,fastmath=True)
def myfun_numba_par_fast(x1,x2):
    y = np.empty(x1.size)
    for i in nb.prange(x1.size): # in parallel across threads
        if x1[i] < 0.5:
            y[i] = np.sum(np.exp(x2*x1[i]))
        else:
            y[i] = np.sum(np.log(x2*x1[i]))
    return y

assert np.allclose(myfun_numpy_vec(x1_np,x2_np),myfun_numba_par_fast(x1,x2))
%timeit myfun_numba_par_fast(x1,x2)

Caveats: Only a limited number of Python and Numpy features are supported inside just-in-time compiled functions.

Parallization can not always be used. Some problems are inherently sequential. If the result from a previous iteration of the loop is required in a later iteration, the cannot be executed seperately in parallel (except in some special cases such as summing). The larger the proportion of the code, which can be run in parallel is, the larger the potential speed-up is. This is called Amdahl's Law.

amdahls_law

6. Parallization without Numba

6.1 serial problem

Assume we need to solve the following optimization problem

[ ]
def solver(alpha,beta,gamma):
    return optimize.minimize(lambda x: (x[0]-alpha)**2 + 
                                       (x[1]-beta)**2 + 
                                       (x[2]-gamma)**2,[0,0,0],method='nelder-mead')

nn times:

[ ]
n = 100*CPUs
alphas = np.random.uniform(size=n)
betas = np.random.uniform(size=n)
gammas = np.random.uniform(size=n)

def serial_solver(alphas,betas,gammas):
    results = [solver(alpha,beta,gamma) for (alpha,beta,gamma) in zip(alphas,betas,gammas)]
    return [result.x for result in results]

%time xopts = serial_solver(alphas,betas,gammas)

Numba: Numba can not be used for parallization here because we rely on the non-Numba function scipy.optimize.minimize.

6.2 joblib

Joblib can be used to run python code in parallel.

  1. joblib.delayed(FUNC)(ARGS) create a task to call FUNC with ARGS.
  2. joblib.Parallel(n_jobs=K)(TASKS) execute the tasks in TASKS in K parallel processes.
[ ]
def parallel_solver_joblib(alphas,betas,gammas,n_jobs=1):

    tasks = (joblib.delayed(solver)(alpha,beta,gamma) for (alpha,beta,gamma) in zip(alphas,betas,gammas))
    results = joblib.Parallel(n_jobs=n_jobs)(tasks)
    
    return [result.x for result in results]
    
for n_jobs in CPUs_list:
    if n_jobs > 36: break
    print(f'n_jobs = {n_jobs}')
    %time xopts = parallel_solver_joblib(alphas,betas,gammas,n_jobs=n_jobs)
    print(f'')

Drawback: The inputs to the functions are serialized and copied to each parallel process.

More on Joblib (examples)

Question: What happens if you remove the method=nelder-mead in the solver() function? Why?

6.3 dask (+)

dask can also be used to run python code in parallel.

  1. dask.delayed(FUNCS)(ARGS) create a task to call FUNC with ARGS.
  2. dask.compute(TASKS,scheduler='processes',num_workers=K) execute the tasks in TASKS in K parallel processes.
[ ]
def parallel_solver_dask(alphas,betas,num_workers=2):

    tasks = (dask.delayed(solver)(alpha,beta,gamma) for (alpha,beta,gamma) in zip(alphas,betas,gammas))
    results = dask.compute(tasks,scheduler='processes',num_workers=num_workers)
    
    return [result.x for result in results[0]]
    
for num_workers in CPUs_list:
    if num_workers > 36:
        break
    print(f'num_workers = {num_workers}')        
    %time xopts = parallel_solver_dask(alphas,betas,num_workers=num_workers)
    print('')

Overhead: dask does not work optimally in our situation (too large overhead), but it has other interesting features where it can be used on a cluster or to solve more complex problem (see below).

More on dask (examples, youtube tutorial)

Some details on dask

Dask can also handle algorithms, where only some parts can be done in parallel, while others must be done sequentially.

[ ]
def inc(x):
    return x + 1

def double(x):
    return x + 2

def add(x, y):
    return x + y

data = [1, 2, 3, 4, 5]

output = []
for x in data:
    a = inc(x)
    b = double(x)
    c = add(a, b)
    output.append(c)

total = sum(output)
print(total)
[ ]
output = []
for x in data:
    a = dask.delayed(inc)(x)
    b = dask.delayed(double)(x)
    c = dask.delayed(add)(a, b)
    output.append(c)

total = dask.delayed(sum)(output)
print(total.compute())

7. Pandas

Create a test dataset of NN units in KK groups.

[ ]
def create_test_data(K,N):
    np.random.seed(1986)
    groups = np.random.randint(low=0,high=K,size=N)
    values = np.random.uniform(size=N)
    df = pd.DataFrame({'group':groups,'value':values})
    return df

K = 10
N = 10**5
df = create_test_data(K,N)
df.head()
[ ]
df.info()

7.1 Example 1: Capping values

A. Loops:

Use a raw loop:

[ ]
def loop(df):
    result = df.value.copy()
    for i in range(len(df)):
        if df.loc[i,'value'] < 0.1:
            result[i] = 0.1
        elif df.loc[i,'value'] > 0.9:
            result[i] = 0.9
    return result

%time loop(df)
loop(df).head()

Use apply row-by-row:

[ ]
def cap(value):
    if value < 0.1:
        return 0.1
    elif value > 0.9:
        return 0.9
    else:
        return value

# slower:
# %time df.apply(lambda x: cap(x['value']),axis=1)

%timeit df.value.apply(lambda x: cap(x))
df.value.apply(lambda x: cap(x)).head()

B. Vectorization: Avoid loop over rows.

Use the transform method:

[ ]
def cap_col(col):
    result = col.copy()
    I = result < 0.1
    result[I] = 0.1
    I = result > 0.9
    result[I] = 0.9
    return result

# slower:
# %timeit df.transform({'value':cap_col})

%timeit df.value.transform(cap_col)
df.value.transform(cap_col).head()

Do it manually:

[ ]
%timeit cap_col(df.value)
cap_col(df.value).head()

Do it manually with a numpy array

[ ]
def cap_col_np(col):
    result = col.copy()
    I = result < 0.1
    result[I] = 0.1
    I = result > 0.9
    result[I] = 0.9
    return result

%timeit result = pd.Series(cap_col_np(df.value.values))
pd.Series(cap_col_np(df.value.values)).head()

Observation: The manual call of a numpy function is the fastest option.

Note: The cap_col_np function could be speeded-up by numba just like any other function taking numpy inputs.

[ ]
# write your code here

@nb.njit
def cap_col_np_nb(col):
    result = col.copy()
    I = result < 0.1
    result[I] = 0.1
    I = result > 0.9
    result[I] = 0.9
    return result

Answer:

[ ]
@nb.njit
def cap_col_np_nb(col):
    result = col.copy()
    I = result < 0.1
    result[I] = 0.1
    I = result > 0.9
    result[I] = 0.9
    return result

pd.Series(cap_col_np_nb(df.value.values)).head()
[ ]
%timeit result = pd.Series(cap_col_np_nb(df.value.values))

7.2 Example 2: Demean within group

Do it manually:

[ ]
def manually(df):
    result = df.value.copy()
    for group in range(K):
        I = df.group == group
        group_mean = df[I].value.mean()
        result[I] = result[I]-group_mean
    return result
        
%timeit result = manually(df)
manually(df).head()

Use groupby.agg and merge:

[ ]
def demean_agg_merge(df):
    
    means = df.groupby('group').agg({'value':'mean'}).reset_index()
    means = means.rename(columns={'value':'mean'})
    
    df_new = pd.merge(df,means,on='group',how='left')    
    
    return df_new['value'] - df_new['mean']

%timeit demean_agg_merge(df)
demean_agg_merge(df).head()

Use groupby.value.apply:

[ ]
def demean_apply(df):
    return df.groupby('group').value.apply(lambda x: x-x.mean())

%timeit demean_apply(df)
demean_apply(df).head()

Use groupby.value.transform:

[ ]
def demean_transform(df):
    return df.groupby('group').value.transform(lambda x: x-x.mean())

%timeit demean_transform(df)
demean_transform(df).head()

Use groupby.value.transform with built-in mean:

[ ]
def demean_transform_fast(df):
    
    means = df.groupby('group').value.transform('mean')
    result = df.value - means
    return result

%timeit demean_transform_fast(df)
demean_transform_fast(df).head()

Observation: demean_transform_fast is the winner so far.

Parallization with dask and numba (+)

Create a bigger dataset and set the index to group and sort by it.

[ ]
K = 10
N = 5*10**7
df = create_test_data(K,N)
df = df.set_index('group')
df = df.sort_index()
df.head()
[ ]
df.info()

Standard pandas:

[ ]
%time df.groupby('group').value.max()
%time df.groupby('group').value.mean()
%time df.groupby('group').value.sum()
%time demean_apply(df)
print('')
%time demean_transform_fast(df)
demean_transform_fast(df).head()

Dask dataframe:

We can work with dask dataframes instead, which imply that some computations are done in parallel.

The syntax is very similar to pandas, but far from all features are implemented (e.g. not transform). There are two central differences between dask dataframes and pandas dataframe:

  1. Dask dataframes are divided into partitions, where each partitution is a sub-set of the index in the dataset. Computations can be done in parallel across partitions.
  2. Dask dataframes use lazy evaluation. Nothing is actually done before the .compute() method is called.

The .compute() method returns a pandas series or dataframe.

More info: documentation for dask.dataframe

Note how dask create partitions based on the index:

[ ]
for k in [2,5,10]:
    ddf = dd.from_pandas(df, npartitions=k)
    print(f'k = {k}:',ddf.divisions)

The time gains are, however, very modest, if there at all:

[ ]
def demean_apply_dd(dff):
    result = dff.groupby('group').value.apply(lambda x: x-x.mean(), meta=('value','float64'))
    return result.compute()

for k in [1,K]:
    
    print(f'number of partitions = {k}')
    %time ddf = dd.from_pandas(df, npartitions=k)
    print('')
    
    %time ddf.groupby('group').value.max().compute()
    %time ddf.groupby('group').value.mean().compute()
    %time ddf.groupby('group').value.sum().compute()  
    %time demean_apply_dd(ddf)
    print('')
    
demean_apply_dd(ddf).head()    

Observations: Some computations are faster after the partioning (not on all computers though). The missing speed-up is most likely explained by fetching of memory being the bottleneck rather than performing the calculations. Generally, the size and complexity of the problem and how many CPUs you have will determine how large the is benefit is.

Numba: Handwritten numba functions for each task can also provide a speed-up in some cases.

[ ]
@nb.njit(parallel=True)
def groupby_demean_numba(group,value,K):
    
    result = np.zeros(value.size)
    for i in nb.prange(K):
        I = group == i
        mean = np.mean(value[I])
        result[I] = value[I]-mean
    
    return result

for _ in range(3):
    %time result = pd.Series(groupby_demean_numba(df.index.values,df.value.values,K))

pd.Series(groupby_demean_numba(df.index.values,df.value.values,K)).head()

Observation: The speed-up is modest. Again the size and complexity of the problem and how many CPUs you have will determine how large the benefit is.

8. Summary

This lecture: You learned that optimizing performance is a difficult task, but the recommendation is to follow the following 6-step procedure:

  1. Choose the right algorithm
  2. Implement simple and robust code for the algorithm
  3. Profile the code to find bottlenecks
  4. Use precomputations, comphrensions and vectorization to speed-up the code
  5. Still need more speed? Consider numba, joblib or dask
  6. Still not enough? C++ is the next level

Next lecture: Perspectives on other programming languages