Discrete phase-type distributions

from phasic import Graph, with_ipv # ALWAYS import phasic first to set jax backend correctly
import sys
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from matplotlib.colors import LogNorm
import seaborn as sns
%config InlineBackend.figure_format = 'svg'
from vscodenb import set_vscode_theme

np.random.seed(42)
set_vscode_theme()
sns.set_palette('tab10')

Consider this simple graph representing an exponential distribution. If you must, you can make it the coalescence time of two lineages in very small population:

def exponential(state):
    transitions = []               # List for transitions from state
    if state[0] < 2:               # Control transitions allowed
        child = state.copy()       # Make a copy to modify
        child[0] += 1              # Make the child state
        trans = [child, 2.0]       # Pair the child with a transition rate
        transitions.append(trans)  # Append transition 
    return transitions

graph = Graph(exponential, ipv=[1])
graph.plot()

def exponential(rate):
    graph = Graph(1)
    vertex = graph.find_or_create_vertex([2])
    graph.starting_vertex().add_edge(vertex, 1)
    abs_vertex = graph.find_or_create_vertex([1])
    vertex.add_edge(abs_vertex, rate)
    return graph

graph = exponential(0.4)
graph.plot()

rate = 0.4
x = np.arange(0, 10, 0.01)
graph = exponential(rate)
plt.plot(x, graph.cdf(x))
plt.show()

Discrete phase-type distributions represent the the number of jumps in a Markov Chain before absorption. We can discretize our graph it to get a geometric distribution:

graph = exponential(0.4)
rewards = graph.discretize(1-rate)
graph.plot()

The discretize method changes to graph in-place, by connecting each non-absorbing vertex in the graph to an auxiliary vertex with a specified rate. Each AUX vertex connects to its parent with rate 1. Once AUX vertices are added, the graph is normalized so the sum of edge weights sum to 1 for each vertex. discretize returns the graph indices of the added AUX vertices.

x = np.arange(10)
# cdf = graph.reward_transform(rewards).cdf(x, discrete=True)
cdf = graph.reward_transform(rewards).cdf(x)
plt.hlines(y=cdf, xmin=x, xmax=x+1, zorder=1, color='lightblue')
plt.scatter(x, cdf, s=18)
plt.show()
Figure 1: Continuous (exponential) distribution

You can treat the discrete graph by passing discrete=False (omit the argument), in which case the returned distribution is over continuous time, rather than discrete jumps:

rate = 0.4
graph = exponential(rate)
rewards = graph.discretize(1-rate)
x = np.arange(0, 10, 0.01)
# cdf = graph.reward_transform(rewards).cdf(x, discrete=False)
cdf = graph.reward_transform(rewards).cdf(x)
plt.plot(x, cdf)
plt.show()
Figure 2: Treating geometric as continuous (exponential) distribution
TipContinuous / Discrete method behavior

You can use many of the graph methods on both continuous and discrete graphs.

The methods moments, expectation, variance, covariance, pdf, cdf, distribution_context, sample, state_probability, and accumulated_occupancy will treat the graph as continuous unless the discrete=True keyword argument is passed. In this case moments and probabilities are computed across discrete jumps rather than across continuous time.

It is sometimes useful to interpret a discrete graph as a continuous distribution. However, passing discrete=True to a method called on a continuous will raise a ValueError, since continuous graph has not discrete interpretation.

Methods without the discrete keyword argument, either apply to both kinds of graphs (expected_waiting_time), or automatically redirect to functions handling each kind (normalize, reward_transform).

Tip

If you make the distribution discrete using discretize, it changes the is_discrete graph attribute to True.

NB: if you make a discrete graph manually yourself, you need to either set graph.is_discrete = True, or call the appropriate “*_discrete” methods directly.

To model discrete mutations with rate 0.1 along branches of the coalescent tree. We could of course just make a new state-space creation function, but we can also manipulate an existing continuous one.

E.g. if you want to get the distribution of accumulated mutations over time rather than jumps.

Let’s model discrete mutations with rate 0.1 along branches of the coalescent tree. We could of course just make a new state-space creation function, but we can also manipulate an existing continuous one.

nr_samples = 4

@with_ipv([nr_samples]+[0]*(nr_samples-1))
def coalescent(state):
    transitions = []
    for i in range(state.size):
        for j in range(i, state.size):            
            same = int(i == j)
            if same and state[i] < 2:
                continue
            if not same and (state[i] < 1 or state[j] < 1):
                continue 
            new = state.copy()
            new[i] -= 1
            new[j] -= 1
            new[i+j+1] += 1
            transitions.append((new, state[i]*(state[j]-same)/(1+same)))
    return transitions

mutation_graph = Graph(coalescent)
mutation_graph.plot()

This process is automated by the discretize method, which accepts either a single rate of discrete events for all state (E.g. graph.discretize(0.7)), or a callable mapping each state vector to a rate. The following example produces the mutation graph above:

from typing import Optional

mutation_graph = Graph(coalescent)

def mutation(state:np.ndarray, mutation_rate:float):
    nr_lineages = sum(state)
    return nr_lineages * mutation_rate

mutation_rate = 0.1
rewards = mutation_graph.discretize(mutation, mutation_rate=mutation_rate)
Figure 3

The auxiliary nodes added have dummy states with all zeros and are labelled as “AUX” vertices when the graph is plotted:

mutation_graph.plot()

If you want, you can assign nodes to subgraphs either by index or by state, by passing a callable as the by_index or by_state argument. For example, we can group the auxiliary nodes we added above into a separate subgraph:

def fun(index):
    if rewards[index]:
        return "Auxiliary states\ncollecting a\nreward (mutations)\neach time visited"
    else:
        return "Standard states"

mutation_graph.plot(by_index=fun)
Figure 4: Grouping states into auxiliary and standard states.

To find compute the expected number of mutations, we apply rewards such that the mutation vertex get a reward of 1 and all other states get reward of 0. Since this is already encoded in the mutation_vertices array we made above, we can convert it to integers and pass it as rewards:

mutation_graph.expectation(rewards)
0.3666666666666667

We can verify that the expected number of mutations correspond to the total branch length scaled with the mutation rate:

graph = Graph(coalescent)
tot_brlen = graph.expectation(graph.states().T.sum(axis=0))
tot_brlen * mutation_rate
0.36666666666666664

Or compare it to emprical mean of samples from the models

samples = mutation_graph.sample(10000, rewards=rewards)
samples.mean().item()
0.3749
samples = graph.sample(10000, rewards=graph.states().T.sum(axis=0))
samples.mean().item() * mutation_rate
0.3679581289515518

If we reward transform the graph, we can find the distribution function for the the total number of mutations:

rev_trans_mutation_graph  = mutation_graph.reward_transform_discrete(rewards)
rev_trans_mutation_graph.plot()

Notice that with this reward transformation the graph is no longer sparse, as all paths through the graph are represented.

mutations = np.arange(10)
pmf = rev_trans_mutation_graph.pdf(mutations)
cdf = rev_trans_mutation_graph.cdf(mutations)

fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(8, 3), sharey=True)
x = np.arange(0, 10, 1)
ax1.bar(x, pmf)
ax1.set_title("PDF")
ax1.set_xlabel('Total number of mutations')
left, right = x, np.arange(1, x.size+1, 1)
ax2.hlines(y=cdf, xmin=left, xmax=right, zorder=1)
ax2.scatter(left, cdf, s=18, zorder=2)
ax2.set_title("CDF")
ax2.set_xlabel('Total number of mutations')
ax2.axhline(y=mutation_graph.defect(), linestyle='dotted') ;
Figure 5: Blah blah blah blah…

Multivariate phase-type distributions (marginal)

We can compute marginal expectations using rewards. Instead of a univariate reward, we can have the distribution earn a vector of rewards for every time unit spent at each vertex. We can use the columns in our state matrix as reward vectors. Singleton rewards are:

graph = Graph(coalescent)
reward_matrix = graph.states().T

singleton = reward_matrix[0]
doubleton = reward_matrix[1]
tripleton = reward_matrix[2]

print(f"Singleton rewards: {singleton}")
Singleton rewards: [0 4 2 0 1 0]

Expected singleton branch length:

graph.expectation(singleton)
1.9999999999999996

Covariance of doubleton and tripleton branch length:

graph.covariance(singleton, doubleton)
-0.22222222222222232

If need be, we can also sample from the multivariate distribution:

nsims = 1000000
samples = graph.sample_multivariate(nsims, graph.states())
samples.shape
(4, 1000000)

Sanity check: The emprical covariance is ok:

float(sum(samples[0,:]*samples[1,:])/nsims - sum(samples[0,:])/nsims*sum(samples[1,:])/nsims)
-0.21739365791458232