Skip to content
Snippets Groups Projects
EffectiveVelocities.py 7.05 KiB
Newer Older
Jonas Wassmer's avatar
Jonas Wassmer committed
# %%
import networkx as nx
import numpy as np
import osmnx as ox
import warnings
import matplotlib.pyplot as plt
import matplotlib as mpl


def map_highway_to_number_of_lanes(highway_type):
    if highway_type == "motorway" or highway_type == "trunk":
        return 4
    elif highway_type == "primary":
        return 3
    elif (
        highway_type == "secondary"
        or highway_type == "motorway_link"
        or highway_type == "trunk_link"
        or highway_type == "primary_link"
    ):
        return 2
    else:
        return 1


def set_number_of_lanes(G):
    edges = ox.graph_to_gdfs(G, nodes=False)
    edges["lanes"] = [
        np.mean(list(map(int, v[0])))
        if type(v[0]) == list
        else int(v[0])
        if type(v[0]) == str
        else map_highway_to_number_of_lanes(v[1])
        if np.isnan(v[0])
        else v[0]
        for k, v in edges[["lanes", "highway"]].iterrows()
    ]
    nx.set_edge_attributes(G, edges["lanes"], "lanes")
    return G


def peak_loads(G, N_road):
    Gc = G.copy()
    Gc = set_number_of_lanes(Gc)
    pop = sum(nx.get_node_attributes(Gc, "population").values())
    loads = nx.get_edge_attributes(Gc, "load")

    tot_load = sum(loads.values())
    peak_load = {k: N_road * pop * (v / tot_load) for k, v in loads.items()}

    nx.set_edge_attributes(Gc, peak_load, "peak_load")
    return Gc


def peak_loads_and_capacities(G, N_road):
    Gc = G.copy()
    Gc = set_number_of_lanes(Gc)
    pop = sum(nx.get_node_attributes(Gc, "population").values())
    loads = nx.get_edge_attributes(Gc, "load")
    lanes = nx.get_edge_attributes(Gc, "lanes")
    lanes_mod = dict((k, max(v - 1, 1)) for k, v in lanes.items())

    lengths = nx.get_edge_attributes(Gc, "length")

    tot_load = sum(loads.values())
    peak_load = {k: N_road * pop * (v / tot_load) for k, v in loads.items()}

    capacities = {
        e: lengths[e] * lanes_mod[e] / (2 * 5 * 1000 / 60 / 60 + 5)
        for e in Gc.edges(keys=True)
    }

    nx.set_edge_attributes(Gc, peak_load, "peak_load")
    nx.set_edge_attributes(Gc, capacities, "capacity")
    return Gc


def reroute_overloaded_roads(Gc, N_road):
    G = Gc.copy()
    G = peak_loads_and_capacities(G, N_road)

    peak_loads = nx.get_edge_attributes(G, "peak_load")
    max_caps = nx.get_edge_attributes(G, "capacity")

    if sum(peak_loads.values()) > sum(max_caps.values()):
        print(
            "Warning: Total loads greater than capacity. Will not converge returning without rerouting."
        )
        return G

    over_idxs = [e for e in G.edges(keys=True) if (peak_loads[e] - max_caps[e]) > 0]
    nx.set_edge_attributes(G, peak_loads, "spillover_load")

    while len(over_idxs) > 0:
        for e in over_idxs:
            u, v, k = e
            capacity = G[u][v][k]["capacity"]
            diff_load = G[u][v][k]["spillover_load"] - capacity
            G[u][v][k]["spillover_load"] = capacity  # diff_load
            pred_edges = G.in_edges(u)
            # if len(pred_edges) == 0:
            #    warnings.warn(f'No predecessor edges to {e} with spillover {diff_load}. Load can not be rerouted. This may impact the results.')

            for n, m in pred_edges:
                G[n][m][0]["spillover_load"] += diff_load / len(pred_edges)

        peak_loads = nx.get_edge_attributes(G, "spillover_load")
        over_idxs = [
            e for e in G.edges(keys=True) if (peak_loads[e] - max_caps[e]) > 1e-5
        ]

    return G


""""
def reroute_overloaded_roads(G, N_road):
    Gc = peak_loads_and_capacities(G, N_road)

    peak_load = nx.get_edge_attributes(Gc, 'peak_load')
    max_caps = nx.get_edge_attributes(Gc, 'capacity')

    if sum(peak_load.values()) > sum(max_caps.values()):
        print('Warning: Total loads greater than capacity. Will not converge returning without rerouting.')
        return Gc

    over_idxs = [ e for e in Gc.edges(keys=True) if (peak_load[e]-max_caps[e]) > 1e-7 ]

    nx.set_edge_attributes(Gc, peak_load, 'spillover_load')

    for u,v,k in over_idxs:
        diff_load = Gc[u][v][k]['spillover_load'] - Gc[u][v][k]['capacity']

        pred_edges = nx.bfs_edges(Gc, u, reverse=True)

        for (m, n) in pred_edges:
            max_comp = Gc[n][m][0]['capacity'] - Gc[n][m][0]['spillover_load']
            if max_comp >= diff_load:
                Gc[n][m][0]['spillover_load'] += diff_load
                Gc[u][v][k]['spillover_load'] -= diff_load
                break
            elif (max_comp > 0) & (max_comp < diff_load):
                Gc[n][m][0]['spillover_load'] += max_comp
                Gc[u][v][k]['spillover_load'] -= max_comp
                diff_load -= max_comp


    spillover_load = nx.get_edge_attributes(Gc, 'spillover_load')
    max_caps = nx.get_edge_attributes(Gc, 'capacity')

    over_spills = [ (spillover_load[e]-max_caps[e]) for e in Gc.edges(keys=True) if (spillover_load[e]-max_caps[e]) > 1e-7 ]
    if len(over_spills)>0:
        print('Warning: not all overloaded roads could be rerouted.')
        print('Overload:', sum(over_spills))
    return Gc
"""


def effective_spillover_velocities(G, N_road):
    x_veh = 5
    t_react = 2

    Gc = reroute_overloaded_roads(G, N_road)

    spillover_loads = nx.get_edge_attributes(Gc, "spillover_load")
    lanes = nx.get_edge_attributes(Gc, "lanes")
    lengths = nx.get_edge_attributes(Gc, "length")
    speed_lims = nx.get_edge_attributes(Gc, "speed_kph")

    lanes_mod = dict((k, max(v - 1, 1)) for k, v in lanes.items())

    eff_velos = {
        e: (
            lengths[e] * lanes_mod[e] / (spillover_loads[e] * t_react) - x_veh / t_react
        )
        * 60
        * 60
        / 1000
        for e in Gc.edges(keys=True)
    }
    eff_velos = dict(
        (e, speed_lims[e]) if v > speed_lims[e] else (e, np.round(v, 1))
        for e, v in eff_velos.items()
    )

    rr_travel_time = {
        e: np.round(np.divide(lengths[e] / 1000 * 60 * 60, eff_velos[e]), 1)
        for e in Gc.edges(keys=True)
    }

    nx.set_edge_attributes(Gc, rr_travel_time, "spillover_travel_time")
    nx.set_edge_attributes(Gc, eff_velos, "spillover_velocity")
    return Gc


def effective_velocities(G, N_road):
    x_veh = 5
    t_react = 2

    Gc = peak_loads_and_capacities(G, N_road)

    peak_loads = nx.get_edge_attributes(Gc, "peak_load")
    lanes = nx.get_edge_attributes(Gc, "lanes")
    lengths = nx.get_edge_attributes(Gc, "length")
    speed_lims = nx.get_edge_attributes(Gc, "speed_kph")

    lanes_mod = dict((k, max(v - 1, 1)) for k, v in lanes.items())

    eff_velos = {
        e: (lengths[e] * lanes_mod[e] / (peak_loads[e] * t_react) - x_veh / t_react)
        * 60
        * 60
        / 1000
        for e in Gc.edges(keys=True)
    }
    eff_velos = dict(
        (e, speed_lims[e])
        if v > speed_lims[e]
        else (e, 5)
        if v < 5
        else (e, np.round(v, 1))
        for e, v in eff_velos.items()
    )

    travel_time = {
        e: np.round(np.divide(lengths[e] / 1000 * 60 * 60, eff_velos[e]), 1)
        for e in Gc.edges(keys=True)
    }

    nx.set_edge_attributes(Gc, travel_time, "effective_travel_time")
    nx.set_edge_attributes(Gc, eff_velos, "effective_velocity")
    return Gc


# %%