Source code for pytrack.matching.mpmatching_utils

import itertools
import math

import networkx as nx
from shapely.geometry import LineString

from pytrack.graph import distance
from pytrack.matching import candidate

SIGMA_Z = 4.07
BETA = 3


def _emission_prob(dist, sigma=SIGMA_Z):
    """ Compute emission probability of a node

    Parameters
    ----------
    dist: float
        Distance between a real GPS point and a candidate node.
    sigma: float, optional, default: SIGMA_Z
        It is an estimate of the magnitude of the GPS error. See https://www.ismll.uni-hildesheim.de/lehre/semSpatial-10s/script/6.pdf
        for a more detailed description of its calculation.

    Returns
    -------
    ret: float
        Emission probability of a node.
    """
    c = 1 / (sigma * math.sqrt(2 * math.pi))
    return c * math.exp(-(dist / sigma) ** 2)


# A gaussian distribution
[docs]def emission_prob(u, sigma=SIGMA_Z): """ Compute emission probability of a node Parameters ---------- u: pytrack.matching.Candidate Node of the graph. sigma: float, optional, default: SIGMA_Z It is an estimate of the magnitude of the GPS error. See https://www.ismll.uni-hildesheim.de/lehre/semSpatial-10s/script/6.pdf for a more detailed description of its calculation. Returns ------- ret: float Emission probability of a node. """ if u.great_dist: c = 1 / (sigma * math.sqrt(2 * math.pi)) return c * math.exp(-(u.great_dist / sigma) ** 2) else: return 1
# A empirical distribution
[docs]def transition_prob(G, u, v, beta=BETA): """ Compute transition probability between node u and v. Parameters ---------- G: networkx.MultiDiGraph Road network graph. u: dict Starting node of the graph. v: dict Target node of the graph. beta: float This describes the difference between route distances and great circle distances. See https://www.ismll.uni-hildesheim.de/lehre/semSpatial-10s/script/6.pdf for a more detailed description of its calculation. Returns ------- ret: float Transition probability between node u and v. """ c = 1 / beta if u.great_dist and v.great_dist: delta = abs( nx.shortest_path_length(G, u.node_id, v.node_id, weight="length", method='dijkstra') - distance.haversine_dist(*u.coord, *v.coord)) return c * math.exp(-delta / beta) else: return 1
[docs]def create_trellis(results): """ Create a Trellis graph. Parameters ---------- results: dict Output of ``candidate.get_candidates`` method. Returns ------- G: networkx.DiGraph A directed acyclic Trellis graph. """ G = nx.DiGraph() prev_nodes = ["start"] G.add_node("start", candidate=candidate.Candidate("start", None, None, None, None)) G.add_node("target", candidate=candidate.Candidate("start", None, None, None, None)) for idx, item in results.items(): obs = item["observation"] nodes, data_node = zip( *[(f"{idx}_{j}", {"candidate": candidate.Candidate(node_id, edge_osmid, obs, dist, cand)}) for j, (node_id, edge_osmid, dist, cand) in enumerate(zip(item["osmid"], item["edge_osmid"], item["dists"], item["candidates"]))]) edges = [(u, v) for u in prev_nodes for v in nodes] G.add_nodes_from(zip(nodes, data_node)) G.add_edges_from(edges) prev_nodes = nodes edges = [(u, "target") for u in prev_nodes] G.add_edges_from(edges) return G
[docs]def create_path(G, trellis, predecessor): """ Create the path that best matches the actual GPS data. Parameters ---------- G: networkx.MultiDiGraph Road network graph. trellis: networkx.DiGraph A directed acyclic graph. predecessor: dict Predecessor for each node. Returns ------- path_elab: list List of node IDs. """ u, v = list(zip(*[(u, v) for v, u in predecessor.items()][::-1])) path = [(u, v) for u, v in zip(u, u[1:])] path_elab = [node for u, v in path for node in nx.shortest_path(G, trellis.nodes[u]["candidate"].node_id, trellis.nodes[v]["candidate"].node_id, weight='length')] path_elab = [k for k, g in itertools.groupby(path_elab)] return path_elab
[docs]def create_matched_path(G, trellis, predecessor): """ Create the path that best matches the actual GPS points. Route created based on results obtained from ``pmatching_utils.viterbi_search`` and ``mpmatching_utils.create_trellis`` methods. Parameters ---------- G: networkx.MultiDiGraph Street network graph used to create trellis graph. trellis: networkx.DiGraph A directed acyclic Trellis graph. predecessor: dict Predecessor for each node. Returns ------- node_ids: list List of ids of the nodes that compose the path. path_coords: list List of nodes' coordinates, in the form of tuple (lat, lon), composing the path. """ node_ids = create_path(G, trellis, predecessor) path_coords = [(lat, lng) for lng, lat in LineString([G.nodes[node]["geometry"] for node in node_ids]).coords] return node_ids, path_coords
[docs]def get_predecessor(target, predecessor): """ Reconstruct predecessor dictionary of a decoded trellis DAG. Parameters ---------- target: str Target node of the trellis DAG. predecessor: dict Dictionary containing the predecessors of the nodes of a decoded Trellis DAG. Returns ------- pred_elab: dict Dictionary containing the predecessors of the best nodes of a decoded Trellis DAG. """ pred_elab = {} pred = predecessor[target] while pred != "start": pred_elab[target.split("_")[0]] = pred target = pred pred = predecessor[target] return pred_elab