State-space construction

from phasic import Graph # ALWAYS import phasic first to set jax backend correctly
import numpy as np
import seaborn as sns
from vscodenb import set_vscode_theme

np.random.seed(42)
set_vscode_theme()
sns.set_palette('tab10')
Overriding theme from NOTEBOOK_THEME environment variable. <phasic._DeviceListFilter object at 0x16c030310>

This phase-type distribution models the time until all rabits have died We can find the expectation, variance, moments

def coalescent(nr_samples):

    # we represent the state as a vector of integers, the numbers of lineages 
    # with "index" number of descendants
    state_vector_length = nr_samples

    # make an empty graph specifying the length of its state vectors
    graph = Graph(state_vector_length)

    # create the initial state where all samples are singletons
    initial_state = np.zeros(state_vector_length, dtype=int)
    initial_state[0] = nr_samples

    # find the initial state or create it if it does not exist yet
    vertex = graph.find_or_create_vertex(initial_state)

    # add an edge from the starting vertex to the initial state
    # signifying that the Markov chain starts in that state with 
    # probability 1
    graph.starting_vertex().add_edge(vertex, 1)

    # Graph index of the current vertex to process: The starting 
    # vertex is at index 0, so the initial_state is at index 1
    index = 1

    # For each vertex visited, we may create new child vertices not
    # yet visited. The while loop makes sure we continue processing
    # states until we have visited the ones we have created:
    while index < graph.vertices_length():

        # get the vertex at the current index
        vertex = graph.vertex_at(index)
        # get the state vector of that vertex
        state = vertex.state()

        # iterate over all combinations of the kinds of lineages
        # (singleton, doubleton, etc):
        for i in range(nr_samples):
            for j in range(i, nr_samples): 

                # if the two kinds are the same, there need to be at
                # least two lineages of that kind to coalesce
                same = int(i == j)
                if same and state[i] < 2:
                    continue

                # otherwise, at least one of each kind
                if not same and (state[i] < 1 or state[j] < 1):
                    continue 

                # create the new state vector to represent the child
                # state produced by the coalescence event
                new_state = state.copy()
                new_state[i] -= 1
                new_state[j] -= 1
                new_state[i+j+1] += 1
                
                # find the vertex with this state if it exists or 
                # create it otherwise
                new_vertex = graph.find_or_create_vertex(new_state)

                # compute the coalescent rate
                rate = state[i]*(state[j]-same)/(1+same)

                # add an edge from the current vertex to the new 
                # vertex and make its weight the coalescent rate
                vertex.add_edge(new_vertex, rate)

        # increment the index to process the next graph vertex
        index += 1
    
    return graph

Or just:

def coalescent(nr_samples):
    state_vector_length = nr_samples
    graph = Graph(state_vector_length)
    initial_state = np.zeros(state_vector_length, dtype=int)
    initial_state[0] = nr_samples
    vertex = graph.find_or_create_vertex(initial_state)
    graph.starting_vertex().add_edge(vertex, 1)
    index = 1
    while index < graph.vertices_length():
        vertex = graph.vertex_at(index)
        state = vertex.state()
        for i in range(nr_samples):
            for j in range(i, nr_samples): 
                same = int(i == j)
                if same and state[i] < 2:
                    continue
                if not same and (state[i] < 1 or state[j] < 1):
                    continue 
                new_state = state.copy()
                new_state[i] -= 1
                new_state[j] -= 1
                new_state[i+j+1] += 1
                new_vertex = graph.find_or_create_vertex(new_state)
                rate = state[i]*(state[j]-same)/(1+same)
                vertex.add_edge(new_vertex, rate)
        index += 1
    return graph
graph = coalescent(4)
graph.vertices_length()
6

You can access state vectors as a two-dimensional array:

graph.states()
array([[0, 0, 0, 0],
       [4, 0, 0, 0],
       [2, 1, 0, 0],
       [0, 2, 0, 0],
       [1, 0, 1, 0],
       [0, 0, 0, 1]], dtype=int32)

and you can plot the graph for visual inspection. See the next tutorial on visualizing graphs for more details.

graph.plot()
Overriding theme from NOTEBOOK_THEME environment variable. <phasic._DeviceListFilter object at 0x16c030310>
Figure 1: State space graph.
TipVertex and Edge APIs

Vertices and Edges each have additional methods for advanced construction and manipulation of graphs not needed for normal use. To learn more, see the API documentation.

Easy graph build with callback function

You can skip the boilerplate code by passing a callback function and initial state probabilities to Graph. The callback function must accept a list/array of integers specifying a state and it must return a list of lists, each list with two elements: a state (list of integers) and a rate (float). It will often look something like this:

def mygraph(state):
    transitions = []
    for i in state:
        child_state = ...
        rate = ...
        transitions.append([child_state, rate])
    return transitions

To create a callback function, just think of rules of your model and ask yourself: “Given some state, what are the allowed transitions to other states and with what rates to these transitions occur?”. If the current state is “two singletons and one doubleton” ([2, 1, 0, 0]), the process can jump to “one singleton and one tripleton” ([1, 0, 1, 0]) with rate 2 and to “two doubletons” ([0, 2, 0, 0]) with rate 1. Here is a callback function building a graph for the coalescent in the say way as in our first example:

def coalescent(state):
    transitions = []
    for i in range(state.size):
        for j in range(i, state.size):            
            same = int(i == j)
            if same and state[i] < 2:
                continue
            if not same and (state[i] < 1 or state[j] < 1):
                continue 
            new = state.copy()
            new[i] -= 1
            new[j] -= 1
            new[i+j+1] += 1
            transitions.append((new, state[i]*(state[j]-same)/(1+same)))
    return transitions
Figure 2

Along with the callback function, you must pass the ipv, a list of lists, each with a state and its initial probability. All additional keyword arguments are passed to Graph are passed on to your callback function.

nr_samples = 4
ipv = [([nr_samples]+[0]*(nr_samples-1), 1)]
graph = Graph(coalescent, ipv=ipv)

If the ipv only has a single state Graph also accepts just the single state without the rate of 1:

nr_samples = 4
ipv = [nr_samples]+[0]*(nr_samples-1)
graph = Graph(coalescent, ipv=ipv)
graph.plot()
Overriding theme from NOTEBOOK_THEME environment variable. <phasic._DeviceListFilter object at 0x16c030310>
Figure 3: Using simple list for initial state.

Your callback is wrapped in decorator that makes your function return the ipv if an empty state vector is passed, does various type checks, and formats your return value. To expose this for testing, debugging, or convenience, you can decorate your callback yourself (in which case phasic will know and not do it again):

from phasic import with_ipv

@with_ipv([nr_samples]+[0]*(nr_samples-1))
def coalescent(state):
    transitions = []
    for i in range(state.size):
        for j in range(i, state.size):            
            same = int(i == j)
            if same and state[i] < 2:
                continue
            if not same and (state[i] < 1 or state[j] < 1):
                continue 
            new = state.copy()
            new[i] -= 1
            new[j] -= 1
            new[i+j+1] += 1
            transitions.append((new, state[i]*(state[j]-same)/(1+same)))
    return transitions

graph = Graph(coalescent)
graph.plot()
Overriding theme from NOTEBOOK_THEME environment variable. <phasic._DeviceListFilter object at 0x16c030310>
Figure 4: Using decorated callback.

Matrix interface

To allow imbedding in a matrix based workflow, a matrix-based representation of the phase-type distribution can be extracted from the graph. Note that the indices in the matrix representation do not correspond to vertex indices in the graph.

mats = graph.as_matrices()
print("Initial probability vector:")
print(mats.ipv)
print("Subintensity matrix:")
print(mats.sim)
print("States:")
print(mats.states)
print("Indicies:")
print(mats.indices)
Initial probability vector:
[1. 0. 0. 0.]
Subintensity matrix:
[[-6.  6.  0.  0.]
 [ 0. -3.  1.  2.]
 [ 0.  0. -1.  0.]
 [ 0.  0.  0. -1.]]
States:
[[4 0 0 0]
 [2 1 0 0]
 [0 2 0 0]
 [1 0 1 0]]
Indicies:
[2 3 4 5]

Although computationally inefficient, the graph can also be constructed from an initial probability vector and a subintensity matrix:

graph = Graph.from_matrices(mats.ipv, mats.sim, mats.states)
graph.plot()
Overriding theme from NOTEBOOK_THEME environment variable. <phasic._DeviceListFilter object at 0x16c030310>
Figure 5: Generated from matrices and state vectors.

If the state argument is not provided, matrix indices are used as matrix states:

graph = Graph.from_matrices(mats.ipv, mats.sim)
graph.plot()
Overriding theme from NOTEBOOK_THEME environment variable. <phasic._DeviceListFilter object at 0x16c030310>
Figure 6: Generated from only matrices.

Building the state space in C

Very very large models can be take a long time to construct. So if you have deloped a model that you need to construct repeatedly, the library allow you to implement the state construction as a stand-alone C/C++ extension available as python module.

The C code building the state space for the rabit model looks like this:

phasic::Graph build(int nr_samples) {

  struct ptd_graph *graph = ptd_graph_create(nr_samples);
  struct ptd_avl_tree *avl_tree = ptd_avl_tree_create(nr_samples);
  int *initial_state = (int *) calloc(graph->state_length, sizeof(int));
  initial_state[0] = nr_samples;
  ptd_graph_add_edge(
    graph->starting_vertex,
    ptd_find_or_create_vertex(graph, avl_tree, initial_state),
    1
  );
  free(initial_state);
  
  int *state = (int *) calloc(graph->state_length, sizeof(int));
  for (size_t k = 1; k < graph->vertices_length; k++) {

    struct ptd_vertex *vertex = graph->vertices[k];
    memcpy(state, vertex->state, graph->state_length * sizeof(int));
    
    for (int i=1; i < nr_samples; ++i) {
      for (int j=i; j < nr_samples; ++j) {
        int same = i == j;
        if (same && state[i] < 2) {
          continue;
        }
        if (same > 0 && (state[i] < 1 or state[j] < 1)) {
          continue ;
        }
        state[i]--;
        state[j]--;
        state[i+j+1]++;
      
        struct ptd_vertex *new_vertex = ptd_find_or_create_vertex(graph, avl_tree, state);
        ptd_graph_add_edge(vertex, child, weight);
      }
    }  
  }
  free(state);

  return graph;
}

To access the function from python, you need to put it a separate file (coalescent_state_space.cpp) with the header and footer shown below:

// cppimport
#include <pybind11/pybind11.h>
#include <phasic.h>

namespace py = pybind11;

using namespace pybind11::literals; // to bring in the `_a` literal

/* Basic C libraries */
#include "stdint.h"
#include "stdlib.h"
/*******************************************/


/* Your build function goes here */


/********************************************/
PYBIND11_MODULE(coalescent_state_space, m) {     /* <- NB: the model name must match the file name */
     m.def("build", &build);
}

/*
<%
setup_pybind11(cfg)
%>
*/

You can see the complete code in rabbit_state_space.cpp.

Then all you need to do is install cppimport

pixi add cppimport

or

pip install cppimport

and then run this code to import your build function.

# import phasic
import cppimport.import_hook
import coalescent_state_space # this will pause for a moment to compile the module

Then you can use it to construct your graph like this:

graph = ptd.Graph(coalescent_state_space.build(2, 2, 4))
graph.plot()