Discrete phase-type distributions

from phasic import Graph, with_ipv # ALWAYS import phasic first to set jax backend correctly
import sys
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from matplotlib.colors import LogNorm
import seaborn as sns
from vscodenb import set_vscode_theme

np.random.seed(42)
set_vscode_theme()
sns.set_palette('tab10')
Overriding theme from NOTEBOOK_THEME environment variable. <phasic._DeviceListFilter object at 0x146c14f10>

Consider this simple graph representing an exponential distribution. If you must, you can make it the coalescence time of two lineages in very small population:

def exponential(rate):
    graph = Graph(1)
    vertex = graph.find_or_create_vertex([2])
    graph.starting_vertex().add_edge(vertex, 1)
    abs_vertex = graph.find_or_create_vertex([1])
    vertex.add_edge(abs_vertex, rate)
    return graph

graph = exponential(0.4)
graph.plot()
Overriding theme from NOTEBOOK_THEME environment variable. <phasic._DeviceListFilter object at 0x146c14f10>

rate = 0.4
x = np.arange(0, 10, 0.01)
graph = exponential(rate)
plt.plot(x, graph.cdf(x));
Figure 1

Discrete phase-type distributions represent the the number of jumps in a Markov Chain before absorption. We can discretize our graph it to get a geometric distribution:

graph = exponential(0.4)
rewards = graph.discretize(1-rate)
graph.plot()
Overriding theme from NOTEBOOK_THEME environment variable. <phasic._DeviceListFilter object at 0x146c14f10>

The discretize method changes to graph in-place, by connecting each non-absorbing vertex in the graph to an auxiliary vertex with a specified rate. Each AUX vertex connects to its parent with rate 1. Once AUX vertices are added, the graph is normalized so the sum of edge weights sum to 1 for each vertex. discretize returns the graph indices of the added AUX vertices.

x = np.arange(10)
# cdf = graph.reward_transform(rewards).cdf(x, discrete=True)
cdf = graph.reward_transform(rewards).cdf(x)
plt.hlines(y=cdf, xmin=x, xmax=x+1, zorder=1, color='lightblue')
plt.scatter(x, cdf, s=18);
Figure 2

You can treat the discrete graph by passing discrete=False (omit the argument), in which case the returned distribution is over continuous time, rather than discrete jumps:

rate = 0.4
graph = exponential(rate)
rewards = graph.discretize(1-rate)
x = np.arange(0, 10, 0.01)
# cdf = graph.reward_transform(rewards).cdf(x, discrete=False)
cdf = graph.reward_transform(rewards).cdf(x)
plt.plot(x, cdf);
Figure 3
TipContinuous / Discrete method behavior

You can use many of the graph methods on both continuous and discrete graphs.

The methods moments, expectation, variance, covariance, pdf, cdf, distribution_context, sample, state_probability, and accumulated_occupancy will treat the graph as continuous unless the discrete=True keyword argument is passed. In this case moments and probabilities are computed across discrete jumps rather than across continuous time.

It is sometimes useful to interpret a discrete graph as a continuous distribution. However, passing discrete=True to a method called on a continuous will raise a ValueError, since continuous graph has not discrete interpretation.

Methods without the discrete keyword argument, either apply to both kinds of graphs (expected_waiting_time), or automatically redirect to functions handling each kind (normalize, reward_transform).

Tip

If you make the distribution discrete using discretize, it changes the is_discrete graph attribute to True.

NB: if you make a discrete graph manually yourself, you need to either set graph.is_discrete = True, or call the appropriate “*_discrete” methods directly.

To model discrete mutations with rate 0.1 along branches of the coalescent tree. We could of course just make a new state-space creation function, but we can also manipulate an existing continuous one.

E.g. if you want to get the distribution of accumulated mutations over time rather than jumps.

Let’s model discrete mutations with rate 0.1 along branches of the coalescent tree. We could of course just make a new state-space creation function, but we can also manipulate an existing continuous one.

nr_samples = 4

@with_ipv([nr_samples]+[0]*(nr_samples-1))
def coalescent(state):
    transitions = []
    for i in range(state.size):
        for j in range(i, state.size):            
            same = int(i == j)
            if same and state[i] < 2:
                continue
            if not same and (state[i] < 1 or state[j] < 1):
                continue 
            new = state.copy()
            new[i] -= 1
            new[j] -= 1
            new[i+j+1] += 1
            transitions.append((new, state[i]*(state[j]-same)/(1+same)))
    return transitions

mutation_graph = Graph(coalescent)
mutation_graph.plot()
Overriding theme from NOTEBOOK_THEME environment variable. <phasic._DeviceListFilter object at 0x146c14f10>

This process is automated by the discretize method, which accepts either a single rate of discrete events for all state (E.g. graph.discretize(0.7)), or a callable mapping each state vector to a rate. The following example produces the mutation graph above:

from typing import Optional

mutation_graph = Graph(coalescent)

def mutation(state:np.ndarray, mutation_rate:float):
    nr_lineages = sum(state)
    return nr_lineages * mutation_rate

mutation_rate = 0.1
rewards = mutation_graph.discretize(mutation, mutation_rate=mutation_rate)
Figure 4

The auxiliary nodes added have dummy states with all zeros and are labelled as “AUX” vertices when the graph is plotted:

mutation_graph.plot()
Overriding theme from NOTEBOOK_THEME environment variable. <phasic._DeviceListFilter object at 0x146c14f10>

If you want, you can assign nodes to subgraphs either by index or by state, by passing a callable as the by_index or by_state argument. For example, we can group the auxiliary nodes we added above into a separate subgraph:

def fun(index):
    if rewards[index]:
        return "Auxiliary states\ncollecting a\nreward (carrot)\neach time visited"
    else:
        return "Standard states"

mutation_graph.plot(by_index=fun)
Overriding theme from NOTEBOOK_THEME environment variable. <phasic._DeviceListFilter object at 0x146c14f10>
Figure 5: Grouping states into auxiliary and standard states.

To find compute the expected number of mutations, we apply rewards such that the mutation vertex get a reward of 1 and all other states get reward of 0. Since this is already encoded in the mutation_vertices array we made above, we can convert it to integers and pass it as rewards:

mutation_graph.expectation(rewards)
0.3666666666666667

We can verify that the expected number of mutations correspond to the total branch length scaled with the mutation rate:

graph = Graph(coalescent)
tot_brlen = graph.expectation(graph.states().T.sum(axis=0))
tot_brlen * mutation_rate
0.3666666666666667

Or compare it to emprical mean of samples from the models

samples = mutation_graph.sample(10000, rewards=rewards)
samples.mean().item()
0.3459
samples = graph.sample(10000, rewards=graph.states().T.sum(axis=0))
samples.mean().item() * mutation_rate
0.3678173498168352

If we reward transform the graph, we can find the distribution function for the the total number of mutations:

rev_trans_mutation_graph  = mutation_graph.reward_transform_discrete(rewards)
rev_trans_mutation_graph.plot()
Overriding theme from NOTEBOOK_THEME environment variable. <phasic._DeviceListFilter object at 0x146c14f10>

Notice that with this reward transformation the graph is no longer sparse, as all paths through the graph are represented.

mutations = np.arange(10)
pmf = rev_trans_mutation_graph.pdf(mutations)
cdf = rev_trans_mutation_graph.cdf(mutations)

fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(8, 3), sharey=True)
x = np.arange(0, 10, 1)
ax1.bar(x, pmf)
ax1.set_title("PDF")
ax1.set_xlabel('Total number of carrots found')
left, right = x, np.arange(1, x.size+1, 1)
ax2.hlines(y=cdf, xmin=left, xmax=right, zorder=1)
ax2.scatter(left, cdf, s=18, zorder=2)
ax2.set_title("CDF")
ax2.set_xlabel('Total number of carrots found')
ax2.axhline(y=mutation_graph.defect(), linestyle='dotted') ;
(a) Blah blah blah blah…
(b)
Figure 6

Multivariate phase-type distributions (marginal)

We can compute marginal expectations using rewards. Instead of a univariate reward, we can have the distribution earn a vector of rewards for every time unit spent at each vertex. We can use the columns in our state matrix as reward vectors. Singleton rewards are:

graph = Graph(coalescent)
reward_matrix = graph.states().T

singleton = reward_matrix[0]
doubleton = reward_matrix[1]
tripleton = reward_matrix[2]

print(f"Singleton rewards: {singleton}")
Singleton rewards: [0 4 2 0 1 0]

Expected singleton branch length:

graph.expectation(singleton)
1.9999999999999996

Covariance of doubleton and tripleton branch length:

graph.covariance(singleton, doubleton)
-0.22222222222222232

If need be, we can also sample from the multivariate distribution:

nsims = 1000000
samples = graph.sample_multivariate(nsims, graph.states())
samples.shape
(4, 1000000)

Sanity check: The emprical covariance is ok:

float(sum(samples[0,:]*samples[1,:])/nsims - sum(samples[0,:])/nsims*sum(samples[1,:])/nsims)
-0.21917802907913675