Source code for pytrack.graph.graph

import datetime as dt
from itertools import groupby

import networkx as nx
from shapely.geometry import Point, LineString

import pytrack
from . import distance
from . import download

useful_tags_way = [
    "bridge",
    "tunnel",
    "oneway",
    "lanes",
    "ref",
    "name",
    "highway",
    "maxspeed",
    "service",
    "access",
    "area",
    "landuse",
    "width",
    "est_width",
    "junction",
]


[docs]def graph_from_bbox(north, south, west, east, simplify=True, network_type='drive', custom_filter=None, buffer_dist=0): """ Create a graph from OpenStreetMap within some bounding box. Parameters ---------- north: float Northern latitude of bounding box. south: float southern latitude of bounding box. west: float Western longitude of bounding box. east: float Eastern longitude of bounding box. simplify: bool, optional, default: True if True, simplify graph topology with the ``simplify_graph`` method. network_type: str, optional, default: 'drive' Type of street network to obtain. custom_filter: str or None, optional, default: None Custom filter to be used instead of the predefined ones to query the Overpass API. buffer_dist: float, optional, default: 0 Distance in meters indicating how much to expand the bounding box. Returns ------- G: networkx.MultiDiGraph Street network graph. """ bbox_buffer = distance.enlarge_bbox(north, south, west, east, buffer_dist) response_json = download.osm_download(bbox_buffer, network_type=network_type, custom_filter=custom_filter) G = create_graph(response_json) if simplify: G.graph["simplified"] = True G = _simplification(G, response_json) else: G.graph["simplified"] = False return G
def _to_simplify(segment): """ Method that determines whether a segment of the graph must be simplified. Parameters ---------- segment: list Graph segment to be evaluated. A segment is a list of node IDs. Returns ------- ret: bool Whether or not to simplify a segment. """ if len(segment) > 2: return True else: return False def _simplification(G, response_json): """ Method that simplify a networkx graph. Parameters ---------- G: networkx.MultiDiGraph Street network graph. response_json: json Response of OpenStreetMap API service got with``osm_download``method Returns ------- G: networkx.MultiDiGraph Simplified street network graph. """ _, paths = get_nodes_edges(response_json) for path in paths.values(): is_oneway = _is_oneway(path, False) nodes = path.pop("nodes") junction = [False, False] if is_oneway: junction[1:1] = [True if G.degree[node] > 2 else False for node in nodes[1:-1]] # Changed >3 to >2 V2.0.4 else: junction[1:1] = [True if G.degree[node] > 4 else False for node in nodes[1:-1]] all_nodes_to_remove = [] all_edges_to_add = [] for segment in _split_at_values(nodes, junction): if _to_simplify(segment): all_edges_to_add.append(_build_edge(G, segment, path, is_oneway, reverse=False)) if not is_oneway: all_edges_to_add.append(_build_edge(G, segment, path, is_oneway, reverse=True)) all_nodes_to_remove.extend(segment[1:-1]) G.remove_nodes_from(set(all_nodes_to_remove)) for edge in all_edges_to_add: G.add_edge(edge["origin"], edge["destination"], **edge["attr_dict"]) return G def _build_edge_attribute(G, segment, segment_attributes, is_oneway): """ Method that build the dictionary of attributes of a segment. Parameters ---------- G: networkx.MultiDiGraph Street network graph. segment: list Graph segment to be evaluated. A segment is a list of node IDs. segment_attributes: dict Dictionary of road segment attributes. is_oneway: bool Indicates whether a street segment is oneway. Returns ------- attribute: dict Dictionary of road segment attributes. """ attribute = segment_attributes.copy() # attribute = {k: segment_attributes[k] for k in useful_tags_way if k in segment_attributes} attribute["geometry"] = LineString([Point((G.nodes[node]["x"], G.nodes[node]["y"])) for node in segment]) attribute["oneway"] = is_oneway lat, lon = zip(*[(G.nodes[node]["y"], G.nodes[node]["x"]) for node in segment]) edge_lengths = sum(distance.haversine_dist(lat[:-1], lon[:-1], lat[1:], lon[1:])).round(3) attribute["length"] = edge_lengths return attribute def _build_edge(G, segment, segment_attributes, is_oneway, reverse=False): """ Method that builds an edge of the network street graph. Parameters ---------- G: networkx.MultiDiGraph Street network graph. segment: list Graph segment to be evaluated. A segment is a list of node IDs. segment_attributes: dict Dictionary of road segment attributes. is_oneway: bool Indicates whether a street segment is oneway. reverse: bool, optional, default: False Indicates whether invert the direction of the edge. Returns ------- edge: dict Edges to be added to the street network graph. """ if not reverse: segment_attributes = _build_edge_attribute(G, segment, segment_attributes, is_oneway) edge = {"origin": segment[0], "destination": segment[-1], "attr_dict": segment_attributes} return edge else: segment_attributes = _build_edge_attribute(G, segment[::-1], segment_attributes, is_oneway) edge = {"origin": segment[-1], "destination": segment[0], "attr_dict": segment_attributes} return edge def _split_at_values(nodes, junction): """ Method that creates a networkx.MultiDiGraph representing a network street graph. Parameters ---------- nodes: list List of node IDs. junction: list It is a list of boolean conditions. If true, split the segment, otherwise not. Returns ------- ret: generator Street network graph. """ indices = [i for i, x in enumerate(nodes) if junction[i]] for start, end in zip([0, *indices], [*indices, len(nodes)]): yield nodes[start:end + 1]
[docs]def create_graph(response_json): """ Method that creates a networkx.MultiDiGraph representing a network street graph. Parameters ---------- response_json: json Response of OpenStreetMap API service got with``osm_download``method. Returns ------- G: networkx.MultiDiGraph Street network graph. """ # TODO: nella creazione del grafo rimuovi intersezioni multiple, in modo tale che c'è un solo nodo intersezione # per grafo create the graph as a MultiDiGraph and set its meta-attributes metadata = { 'created_date': "{:%Y-%m-%d %H:%M:%S}".format(dt.datetime.now()), 'created_with': f"PyTrack {pytrack.__version__}", 'crs': "epsg:4326", 'geometry': False } G = nx.MultiDiGraph(**metadata) nodes, paths = get_nodes_edges(response_json) # add each osm node to the graph for node, data in nodes.items(): G.add_node(node, **data) add_edges(G, paths, bidirectional=False) # add length (haversine distance between nodes) attribute to each edge if len(G.edges) > 0: G = distance.add_edge_lengths(G) return G
def _oneway_path_values(path): """ Checks whether an OSM path is oneway. Parameters ---------- path: dict Dictionary that describes an OSM path. Returns ------- ret: dict Indicates whether an OSM path is oneway. """ return {path[key] for key in path.keys() if key.startswith("oneway")} # Removed 'and path[key] == "no"' v2.0.4 def _is_oneway(path, bidirectional): """ Checks whether an OSM path is oneway. Parameters ---------- path: dict Dictionary that describes an OSM path. bidirectional: bool Indicates whether an edge is bidirectional. Returns ------- is_oneway: bool Indicates whether an OSM path is oneway. """ no_oneway_values = {"no", "false", "0", "reversible", "alternating"} oneway_path_values = _oneway_path_values(path) is_oneway = oneway_path_values.isdisjoint(no_oneway_values) and ( not not oneway_path_values) and not bidirectional return is_oneway def _is_reversed(path): """ Checks whether an OSM path is reversed. Parameters ---------- path: dict Dictionary that describes an OSM path. Returns ------- is_reversed: bool Indicates whether an OSM path is reversed. """ reversed_values = {"-1", "reverse", "T"} oneway_path_values = _oneway_path_values(path) is_reversed = not oneway_path_values.isdisjoint(reversed_values) return is_reversed
[docs]def add_edges(G, paths, bidirectional=False, all_oneway=False): """ Add OSM edges to a ``networkx.MultiDiGraph``. Parameters ---------- G: networkx.MultiDiGraph Street network graph. paths: dict Dictionary of OSM paths. bidirectional: bool, optional, default: False Indicates whether an edge is bidirectional. all_oneway: bool, optional, default: False Indicates whether an edge is oneway. """ # TODO: "junction": "roundabout" è oneway for path in paths.values(): nodes = path.pop('nodes') is_oneway = _is_oneway(path, bidirectional) is_reversed = _is_reversed(path) if is_oneway and is_reversed: nodes.reverse() if not all_oneway: path['oneway'] = is_oneway edges = list(zip(nodes[:-1], nodes[1:])) if not is_oneway: edges.extend([(v, u) for u, v in edges]) G.add_edges_from(edges, **path)
[docs]def convert_edge(element): """ Convert an OSM edge into an edge to construct a street network graph. Parameters ---------- element: dict An OSM path. Returns ------- path: dict Dictionary for an OSM path. """ # add OSM path id and remove consecutive duplicate nodes in the list of path's nodes path = {'osmid': element['id'], 'nodes': [group for group, _ in groupby(element['nodes'])]} # add tags path.update(element['tags']) return path
[docs]def convert_node(element): """ Convert an OSM node into a node to construct a street network graph. Parameters ---------- element: dict An OSM node. Returns ------- path: dict Dictionary for an OSM node. """ # add node's GPS coordinates node = {'y': element['lat'], 'x': element['lon']} # add tags node.update(element['tags']) if "tags" in element else None return node
[docs]def get_nodes_edges(response_json): """ Extract nodes and paths from the OpenStreetMap query response. Parameters ---------- response_json: json Response of OpenStreetMap API service got with``osm_download``method Returns ------- nodes: dict Dictionary of OSM nodes. paths: dict Dictionary of OSM paths. """ nodes = dict() paths = dict() for element in response_json['elements']: if element['type'] == "node": nodes[element['id']] = convert_node(element) elif element['type'] == "way": paths[element['id']] = convert_edge(element) return nodes, paths