from phasic import Graph # ALWAYS import phasic first to set jax backend correctly
import numpy as np
import seaborn as sns
%config InlineBackend.figure_format = 'svg'
from vscodenb import set_vscode_theme
np.random.seed(42)
set_vscode_theme()
sns.set_palette('tab10')State-space construction
This phase-type distribution models the time until all rabits have died We can find the expectation, variance, moments
def coalescent(nr_samples):
# we represent the state as a vector of integers, the numbers of lineages
# with "index" number of descendants
state_vector_length = nr_samples
# make an empty graph specifying the length of its state vectors
graph = Graph(state_vector_length)
# create the initial state where all samples are singletons
initial_state = np.zeros(state_vector_length, dtype=int)
initial_state[0] = nr_samples
# find the initial state or create it if it does not exist yet
vertex = graph.find_or_create_vertex(initial_state)
# add an edge from the starting vertex to the initial state
# signifying that the Markov chain starts in that state with
# probability 1
graph.starting_vertex().add_edge(vertex, 1)
# Graph index of the current vertex to process: The starting
# vertex is at index 0, so the initial_state is at index 1
index = 1
# For each vertex visited, we may create new child vertices not
# yet visited. The while loop makes sure we continue processing
# states until we have visited the ones we have created:
while index < graph.vertices_length():
# get the vertex at the current index
vertex = graph.vertex_at(index)
# get the state vector of that vertex
state = vertex.state()
# iterate over all combinations of the kinds of lineages
# (singleton, doubleton, etc):
for i in range(nr_samples):
for j in range(i, nr_samples):
# if the two kinds are the same, there need to be at
# least two lineages of that kind to coalesce
same = int(i == j)
if same and state[i] < 2:
continue
# otherwise, at least one of each kind
if not same and (state[i] < 1 or state[j] < 1):
continue
# create the new state vector to represent the child
# state produced by the coalescence event
new_state = state.copy()
new_state[i] -= 1
new_state[j] -= 1
new_state[i+j+1] += 1
# find the vertex with this state if it exists or
# create it otherwise
new_vertex = graph.find_or_create_vertex(new_state)
# compute the coalescent rate
rate = state[i]*(state[j]-same)/(1+same)
# add an edge from the current vertex to the new
# vertex and make its weight the coalescent rate
vertex.add_edge(new_vertex, rate)
# increment the index to process the next graph vertex
index += 1
return graphOr just:
def coalescent(nr_samples):
state_vector_length = nr_samples
graph = Graph(state_vector_length)
initial_state = np.zeros(state_vector_length, dtype=int)
initial_state[0] = nr_samples
vertex = graph.find_or_create_vertex(initial_state)
graph.starting_vertex().add_edge(vertex, 1)
index = 1
while index < graph.vertices_length():
vertex = graph.vertex_at(index)
state = vertex.state()
for i in range(nr_samples):
for j in range(i, nr_samples):
same = int(i == j)
if same and state[i] < 2:
continue
if not same and (state[i] < 1 or state[j] < 1):
continue
new_state = state.copy()
new_state[i] -= 1
new_state[j] -= 1
new_state[i+j+1] += 1
new_vertex = graph.find_or_create_vertex(new_state)
rate = state[i]*(state[j]-same)/(1+same)
vertex.add_edge(new_vertex, rate)
index += 1
return graphgraph = coalescent(4)
graph.vertices_length()6
You can access state vectors as a two-dimensional array:
graph.states()array([[0, 0, 0, 0],
[4, 0, 0, 0],
[2, 1, 0, 0],
[0, 2, 0, 0],
[1, 0, 1, 0],
[0, 0, 0, 1]], dtype=int32)
and you can plot the graph for visual inspection. See the next tutorial on visualizing graphs for more details.
Easy graph build with callback function
You can skip the boilerplate code by passing a callback function and initial state probabilities to Graph. The callback function must accept a list/array of integers specifying a state and it must return a list of lists, each list with two elements: a state (list of integers) and a rate (float). It will often look something like this:
def mygraph(state):
transitions = []
for i in state:
child_state = ...
rate = ...
transitions.append([child_state, rate])
return transitionsTo create a callback function, just think of rules of your model and ask yourself: “Given some state, what are the allowed transitions to other states and with what rates to these transitions occur?”. If the current state is “two singletons and one doubleton” ([2, 1, 0, 0]), the process can jump to “one singleton and one tripleton” ([1, 0, 1, 0]) with rate 2 and to “two doubletons” ([0, 2, 0, 0]) with rate 1. Here is a callback function building a graph for the coalescent in the say way as in our first example:
def coalescent(state):
transitions = []
for i in range(state.size):
for j in range(i, state.size):
same = int(i == j)
if same and state[i] < 2:
continue
if not same and (state[i] < 1 or state[j] < 1):
continue
new = state.copy()
new[i] -= 1
new[j] -= 1
new[i+j+1] += 1
transitions.append((new, state[i]*(state[j]-same)/(1+same)))
return transitionsAlong with the callback function, you must pass the ipv, a list of lists, each with a state and its initial probability. All additional keyword arguments are passed to Graph are passed on to your callback function.
nr_samples = 4
ipv = [([nr_samples]+[0]*(nr_samples-1), 1)]
graph = Graph(coalescent, ipv=ipv)If the ipv only has a single state Graph also accepts just the single state without the rate of 1:
nr_samples = 4
ipv = [nr_samples]+[0]*(nr_samples-1)
graph = Graph(coalescent, ipv=ipv)
graph.plot()Your callback is wrapped in decorator that makes your function return the ipv if an empty state vector is passed, does various type checks, and formats your return value. To expose this for testing, debugging, or convenience, you can decorate your callback yourself (in which case phasic will know and not do it again):
from phasic import with_ipv
@with_ipv([nr_samples]+[0]*(nr_samples-1))
def coalescent(state):
transitions = []
for i in range(state.size):
for j in range(i, state.size):
same = int(i == j)
if same and state[i] < 2:
continue
if not same and (state[i] < 1 or state[j] < 1):
continue
new = state.copy()
new[i] -= 1
new[j] -= 1
new[i+j+1] += 1
transitions.append((new, state[i]*(state[j]-same)/(1+same)))
return transitions
graph = Graph(coalescent)
graph.plot()Matrix interface
To allow imbedding in a matrix based workflow, a matrix-based representation of the phase-type distribution can be extracted from the graph. Note that the indices in the matrix representation do not correspond to vertex indices in the graph.
mats = graph.as_matrices()
print("Initial probability vector:")
print(mats.ipv)
print("Subintensity matrix:")
print(mats.sim)
print("States:")
print(mats.states)
print("Indicies:")
print(mats.indices)Initial probability vector:
[1. 0. 0. 0.]
Subintensity matrix:
[[-6. 6. 0. 0.]
[ 0. -3. 1. 2.]
[ 0. 0. -1. 0.]
[ 0. 0. 0. -1.]]
States:
[[4 0 0 0]
[2 1 0 0]
[0 2 0 0]
[1 0 1 0]]
Indicies:
[2 3 4 5]
Although computationally inefficient, the graph can also be constructed from an initial probability vector and a subintensity matrix:
graph = Graph.from_matrices(mats.ipv, mats.sim, mats.states)
graph.plot()If the state argument is not provided, matrix indices are used as matrix states:
Graph caching
When building graphs using callback functions, the graph cache (~/.phasic_cache/graphs/) allows fast reconstruction of graphs that take a long time to build. This especially true for models (like the coalescent) where each visited vertex requires nested for loops over the state vector.
The cache key is a SHA-256 hash of callback function’s AST (so whitespace/comment changes are ignored) and all other parameters passed to it including the IPV. So any change to the callback code or parameters will invalidate the cache.
from phasic import (
Graph, with_ipv, GraphCache,
print_graph_cache_info, get_graph_cache_stats
)
import numpy as np
import seaborn as sns
%config InlineBackend.figure_format = 'svg'
from vscodenb import set_vscode_theme
set_vscode_theme()
sns.set_palette('tab10')def coalescent(state, N=None):
transitions = []
for i in range(state.size):
for j in range(i, state.size):
same = int(i == j)
if same and state[i] < 2:
continue
if not same and (state[i] < 1 or state[j] < 1):
continue
new = state.copy()
new[i] -= 1
new[j] -= 1
new[i+j+1] += 1
transitions.append((new, state[i]*(state[j]-same)/(1+same) / N ))
return transitions
nr_samples = 40
initial = [([nr_samples]+[0]*(nr_samples-1), 1)]With |V| vertices and a state vector length of s, building this graph is O(|V| s^2).
%%time
graph = Graph(coalescent, ipv=initial, N=10000, graph_cache=True)CPU times: user 11.4 s, sys: 138 ms, total: 11.6 s
Wall time: 11.7 s
Reconstructing it from cache is only O(|V|).
%%time
graph = Graph(coalescent, ipv=initial, N=10000, graph_cache=True)CPU times: user 1.84 s, sys: 54.1 ms, total: 1.89 s
Wall time: 1.89 s
If graph_cache=True is not passed, the graph is built from scratch:
%%time
graph = Graph(coalescent, ipv=initial, N=10000)CPU times: user 7.7 s, sys: 16.7 ms, total: 7.72 s
Wall time: 7.72 s
Cache Inspection
print_graph_cache_info()Cache directory: /Users/kmt/.phasic_cache/graphs
Cached graphs: 1
Total size: 50.93 MB
get_graph_cache_stats(){'num_graphs': 1,
'total_size_mb': 50.928025245666504,
'cache_dir': '/Users/kmt/.phasic_cache/graphs'}
Custom Class Serialization
If you pass your own objects as parameters to the constructor callback, make sure they implement to_dict() and from_dict(). These are already implemented by StateIndexer.
from phasic.state_indexing import StateIndexer, Property
class MyModel:
def __init__(self, param1, param2):
self.param1 = param1
self.param2 = param2
def to_dict(self):
return {'param1': self.param1, 'param2': self.param2}
@classmethod
def from_dict(cls, data):
return cls(data['param1'], data['param2'])
# Now cacheable!
model = MyModel(1.0, 2.0)
graph = Graph(callback, model=model, graph_cache=True)
# StateIndexer already implements to_dict/from_dict
indexer = StateIndexer(lineage=[Property('descendants', max_value=10)])
graph = Graph(callback, indexer=indexer, graph_cache=True) # Works!
--------------------------------------------------------------------------- NameError Traceback (most recent call last) Cell In[48], line 17 15 # Now cacheable! 16 model = MyModel(1.0, 2.0) ---> 17 graph = Graph(callback, model=model, graph_cache=True) 19 # StateIndexer already implements to_dict/from_dict 20 indexer = StateIndexer(lineage=[Property('descendants', max_value=10)]) NameError: name 'callback' is not defined
Building the state space in C
Very very large models can be take a long time to construct. So if you have deloped a model that you need to construct repeatedly, the library allow you to implement the state construction as a stand-alone C/C++ extension available as python module.
The C code building the state space for the rabit model looks like this:
phasic::Graph build(int nr_samples) {
struct ptd_graph *graph = ptd_graph_create(nr_samples);
struct ptd_avl_tree *avl_tree = ptd_avl_tree_create(nr_samples);
int *initial_state = (int *) calloc(graph->state_length, sizeof(int));
initial_state[0] = nr_samples;
ptd_graph_add_edge(
graph->starting_vertex,
ptd_find_or_create_vertex(graph, avl_tree, initial_state),
1
);
free(initial_state);
int *state = (int *) calloc(graph->state_length, sizeof(int));
for (size_t k = 1; k < graph->vertices_length; k++) {
struct ptd_vertex *vertex = graph->vertices[k];
memcpy(state, vertex->state, graph->state_length * sizeof(int));
for (int i=1; i < nr_samples; ++i) {
for (int j=i; j < nr_samples; ++j) {
int same = i == j;
if (same && state[i] < 2) {
continue;
}
if (same > 0 && (state[i] < 1 or state[j] < 1)) {
continue ;
}
state[i]--;
state[j]--;
state[i+j+1]++;
struct ptd_vertex *new_vertex = ptd_find_or_create_vertex(graph, avl_tree, state);
ptd_graph_add_edge(vertex, child, weight);
}
}
}
free(state);
return graph;
}To access the function from python, you need to put it a separate file (coalescent_state_space.cpp) with the header and footer shown below:
// cppimport
#include <pybind11/pybind11.h>
#include <phasic.h>
namespace py = pybind11;
using namespace pybind11::literals; // to bring in the `_a` literal
/* Basic C libraries */
#include "stdint.h"
#include "stdlib.h"
/*******************************************/
/* Your build function goes here */
/********************************************/
PYBIND11_MODULE(coalescent_state_space, m) { /* <- NB: the model name must match the file name */
m.def("build", &build);
}
/*
<%
setup_pybind11(cfg)
%>
*/You can see the complete code in coalescent_state_space.cpp.
Then all you need to do is install cppimport
pixi add cppimportor
pip install cppimportand then run this code to import your build function.
# import phasic
import cppimport.import_hook
import coalescent_state_space # this will pause for a moment to compile the moduleThen you can use it to construct your graph like this:
graph = ptd.Graph(coalescent_state_space.build(2, 2, 4))
graph.plot()Then you can use it to construct your graph like this:
graph = ptd.Graph(coalescent_state_space.build(2, 2, 4))
graph.plot()