Phase-type distributions model time-homogeneous Markov jump processes, but phasic has limited support for time-inhomogeneous processs using the two approaches described below:
from phasic import Graph, with_ipv # ALWAYS import phasic first to set jax backend correctlyimport numpy as npimport pandas as pdimport matplotlib.pyplot as pltimport seaborn as snsfrom vscodenb import set_vscode_theme, vscode_themenp.random.seed(42)set_vscode_theme()sns.set_palette('tab10')
Overriding theme from NOTEBOOK_THEME environment variable. <phasic._DeviceListFilter object at 0x178eacb50>
Step-wise construction
nr_samples =4@with_ipv([nr_samples]+[0]*(nr_samples-1))def coalescent_1param(state): transitions = []for i inrange(state.size):for j inrange(i, state.size): same =int(i == j)if same and state[i] <2:continueifnot same and (state[i] <1or state[j] <1):continue new = state.copy() new[i] -=1 new[j] -=1 new[i+j+1] +=1 transitions.append([new, [state[i]*(state[j]-same)/(1+same)]])return transitionsgraph = Graph(coalescent_1param)N =10000graph.update_weights([1/N])graph.plot()
Overriding theme from NOTEBOOK_THEME environment variable. <phasic._DeviceListFilter object at 0x178eacb50>
PDF/CDF
Say we want to model a population bottleneck where N falls to N_{bottle} at time t_{start} and then goes back to N at time t_{end}. To make the approach scale to more changes, and allow more than one parameter we can define changes as a list of (start_time, parameters) tuples:
If we pick a time far into the future (like 1000), we can integrate under it to find the expectation. accumulated_occupancy computes the time spent in each state up to a time t, so all you have to do is sum these times.
Note that, unlike all other computations, and the PDF of continuous distributions are not mathematically exact. But as you can see, it is a (very good) approximation using arbitrarily small discrete steps. If the rates change dramatically, set you may have to use the granularity keyword argument to manually set granularity to a high enough value.
Keep in mind, that for time-homogeneous graphs, expected_sojourn_time is both exact and much faster than accumulated_occupancy.
Marginal expectations
We can scale by a reward, and thereby find the marginal expectation of e.g. singleton branch length accumulated by time t:
t =2reward_matrix = graph.states().Tnp.sum(graph.accumulated_occupancy(t)*reward_matrix[0]).item()
1.8362807228049958
That lets us track how the SFS develops over time. Normalized to sum to one, we can see how singletons dominate early in the coalescence process:
# Get states and remove epoch label columnstate_mat = graph.states()rewards = state_mat[:, :-1] # Remove last column (epoch labels)# Compute site frequency spectrumx = np.arange(1, nr_samples)sfs = np.zeros(nr_samples -1)for i inrange(nr_samples -1): reward_vec = rewards[:, i] transformed_graph = graph.reward_transform(reward_vec) sfs[i] = transformed_graph.moments(1)[0]sns.barplot(x=x, y=sfs, hue=x, width=0.8, palette='iridis', legend=False);
Comparing to the exact results of Pool and Nielsen for pairwise coelescence time
def exp_coal(g, N):""" Compute expected coalescence time in epoch N is the number of diploid invididuals g is the number of generations spanned by the epoch """# return 2*N - (g * np.exp(-g/(2*N))) / (1 - np.exp(-g/(2*N)))return N - (g * np.exp(-g/(N))) / (1- np.exp(-g/(N)))def epoch(demog, h, i):"Recursively compute expected coalescence time across all epoches" g, N = demog[i] N *= hif i ==len(demog)-1:# return 2*N# return (1-np.exp(-g/(2*N))) * exp_coal(g, N) + np.exp(-g/(2*N)) * (g + epoch(demog, h, i+1))return Nreturn (1-np.exp(-g/(N))) * exp_coal(g, N) + np.exp(-g/(N)) * (g + epoch(demog, h, i+1))def pool_nielsen(gens, Ne, h):""" Compute expected coalescence time in units of 2N Ne is the a list/series of Ne in the epoch. gens is the a list/series of generation at which an each epoch begins (the last epoch lasts forever) h is the relative population size, 0.75 for chrX. """ epochs =list()for i inrange(len(gens)):if i ==0: epochs.append((gens[i+1], Ne[i]))elif i ==len(gens)-1: epochs.append((None, Ne[i])) else: epochs.append((gens[i+1] - gens[i], Ne[i]))return epoch(epochs, h, 0)
for change_time, new_params in param_changes: while ctx.time() < change_time: cdf.append(ctx.cdf()) times.append(ctx.time()) ctx.step() if ctx.cdf() >= cdf_cutoff: break rew_graph.update_weights(new_params) while ctx.cdf() < cdf_cutoff: cdf.append(ctx.cdf()) times.append(ctx.time()) ctx.step()
epoch_idx = int(epoch_idx)
if state[indexer.epoch] != epoch_idx:
return transitions
for i, j in all_pairs(indexer.lineages):
pi = indexer.lineages.index_to_props(i)
pj = indexer.lineages.index_to_props(j)
if state.sum() <= 1:
continue
same = int(pi.ton == pj.ton)
if same and state[i] < 2:
continue
if not same and (state[i] < 1 or state[j] < 1):
continue
new = state.copy()
new[i] -= 1
new[j] -= 1
k = indexer.props_to_index(ton=pi.ton + pj.ton)
new[k] += 1
coeff = np.zeros(len(epochs)+1+1) # +1 to have a slot for mutation edges and +1 for edges between epochs
coeff[epoch_idx] = state[i]*(state[j]-same)/(1+same)
# print(epoch_idx, coeff)
transitions.append([new, coeff])
# transitions.append([new, [state[i]*(state[j]-same)/(1+same)]])
return transitions
def add_epoch(graph, epoch_idx, indexer):
epoch = epochs[epoch_idx]
stop_probs = np.array(graph.stop_probability(epoch))
accum_v_time = np.array(graph.accumulated_occupancy(epoch))
with np.errstate(invalid='ignore'):
epoch_trans_rates = stop_probs / accum_v_time
for i in range(1, graph.vertices_length()-1):
if epoch_trans_rates is None or np.isnan(epoch_trans_rates[i]):
continue
if graph.vertex_at(i).edges_length() == 0:
continue
vertex = graph.vertex_at(i)
state = vertex.state()
if not state[indexer.epoch] == epoch_idx - 1:
continue
sister_state = state.copy()
sister_state[indexer.epoch] = epoch_idx
child = graph.find_or_create_vertex(sister_state)
coeff = np.zeros(len(epochs)+1+1) # +1 to have a slot for transitions between epochs
coeff[-1] = epoch_trans_rates[i]
vertex.add_edge(child, coeff)
graph.extend(coalescent_2param, epoch_idx=epoch_idx, indexer=indexer)
def mutation_rate(state): coeff = np.zeros(len(epochs)+1+1) nr_lineages = sum(state[:-2]) coeff[-2] = 1#nr_lineages # TURN THIS BACK TO nr_lineages FOR CORRECT MUTATIONS ON TOTAL BRANCH LENGTH return coeff
graph.update_weights([1/size for size in pop_sizes] + [1] + [1]) # set edge weights
for epoch_idx in range(1, len(epochs)): graph.update_weights([1/size for size in pop_sizes] + [1] + [1]) # set weights of new edges add_epoch(graph, epoch_idx, indexer)
graph.update_weights([1/size for size in pop_sizes] + [1] + [1]) # set weights of new edges rewards = graph.discretize(mutation_rate, skip_existing=True)