# %% import networkx as nx import numpy as np import osmnx as ox import warnings import matplotlib.pyplot as plt import matplotlib as mpl def map_highway_to_number_of_lanes(highway_type): if highway_type == "motorway" or highway_type == "trunk": return 4 elif highway_type == "primary": return 3 elif ( highway_type == "secondary" or highway_type == "motorway_link" or highway_type == "trunk_link" or highway_type == "primary_link" ): return 2 else: return 1 def set_number_of_lanes(G): edges = ox.graph_to_gdfs(G, nodes=False) edges["lanes"] = [ np.mean(list(map(int, v[0]))) if type(v[0]) == list else int(v[0]) if type(v[0]) == str else map_highway_to_number_of_lanes(v[1]) if np.isnan(v[0]) else v[0] for k, v in edges[["lanes", "highway"]].iterrows() ] nx.set_edge_attributes(G, edges["lanes"], "lanes") return G def peak_loads(G, N_road): Gc = G.copy() Gc = set_number_of_lanes(Gc) pop = sum(nx.get_node_attributes(Gc, "population").values()) loads = nx.get_edge_attributes(Gc, "load") tot_load = sum(loads.values()) peak_load = {k: N_road * pop * (v / tot_load) for k, v in loads.items()} nx.set_edge_attributes(Gc, peak_load, "peak_load") return Gc def peak_loads_and_capacities(G, N_road): Gc = G.copy() Gc = set_number_of_lanes(Gc) pop = sum(nx.get_node_attributes(Gc, "population").values()) loads = nx.get_edge_attributes(Gc, "load") lanes = nx.get_edge_attributes(Gc, "lanes") lanes_mod = dict((k, max(v - 1, 1)) for k, v in lanes.items()) lengths = nx.get_edge_attributes(Gc, "length") tot_load = sum(loads.values()) peak_load = {k: N_road * pop * (v / tot_load) for k, v in loads.items()} capacities = { e: lengths[e] * lanes_mod[e] / (2 * 5 * 1000 / 60 / 60 + 5) for e in Gc.edges(keys=True) } nx.set_edge_attributes(Gc, peak_load, "peak_load") nx.set_edge_attributes(Gc, capacities, "capacity") return Gc def reroute_overloaded_roads(Gc, N_road): G = Gc.copy() G = peak_loads_and_capacities(G, N_road) peak_loads = nx.get_edge_attributes(G, "peak_load") max_caps = nx.get_edge_attributes(G, "capacity") if sum(peak_loads.values()) > sum(max_caps.values()): print( "Warning: Total loads greater than capacity. Will not converge returning without rerouting." ) return G over_idxs = [e for e in G.edges(keys=True) if (peak_loads[e] - max_caps[e]) > 0] nx.set_edge_attributes(G, peak_loads, "spillover_load") while len(over_idxs) > 0: for e in over_idxs: u, v, k = e capacity = G[u][v][k]["capacity"] diff_load = G[u][v][k]["spillover_load"] - capacity G[u][v][k]["spillover_load"] = capacity # diff_load pred_edges = G.in_edges(u) # if len(pred_edges) == 0: # warnings.warn(f'No predecessor edges to {e} with spillover {diff_load}. Load can not be rerouted. This may impact the results.') for n, m in pred_edges: G[n][m][0]["spillover_load"] += diff_load / len(pred_edges) peak_loads = nx.get_edge_attributes(G, "spillover_load") over_idxs = [ e for e in G.edges(keys=True) if (peak_loads[e] - max_caps[e]) > 1e-5 ] return G """" def reroute_overloaded_roads(G, N_road): Gc = peak_loads_and_capacities(G, N_road) peak_load = nx.get_edge_attributes(Gc, 'peak_load') max_caps = nx.get_edge_attributes(Gc, 'capacity') if sum(peak_load.values()) > sum(max_caps.values()): print('Warning: Total loads greater than capacity. Will not converge returning without rerouting.') return Gc over_idxs = [ e for e in Gc.edges(keys=True) if (peak_load[e]-max_caps[e]) > 1e-7 ] nx.set_edge_attributes(Gc, peak_load, 'spillover_load') for u,v,k in over_idxs: diff_load = Gc[u][v][k]['spillover_load'] - Gc[u][v][k]['capacity'] pred_edges = nx.bfs_edges(Gc, u, reverse=True) for (m, n) in pred_edges: max_comp = Gc[n][m][0]['capacity'] - Gc[n][m][0]['spillover_load'] if max_comp >= diff_load: Gc[n][m][0]['spillover_load'] += diff_load Gc[u][v][k]['spillover_load'] -= diff_load break elif (max_comp > 0) & (max_comp < diff_load): Gc[n][m][0]['spillover_load'] += max_comp Gc[u][v][k]['spillover_load'] -= max_comp diff_load -= max_comp spillover_load = nx.get_edge_attributes(Gc, 'spillover_load') max_caps = nx.get_edge_attributes(Gc, 'capacity') over_spills = [ (spillover_load[e]-max_caps[e]) for e in Gc.edges(keys=True) if (spillover_load[e]-max_caps[e]) > 1e-7 ] if len(over_spills)>0: print('Warning: not all overloaded roads could be rerouted.') print('Overload:', sum(over_spills)) return Gc """ def effective_spillover_velocities(G, N_road): x_veh = 5 t_react = 2 Gc = reroute_overloaded_roads(G, N_road) spillover_loads = nx.get_edge_attributes(Gc, "spillover_load") lanes = nx.get_edge_attributes(Gc, "lanes") lengths = nx.get_edge_attributes(Gc, "length") speed_lims = nx.get_edge_attributes(Gc, "speed_kph") lanes_mod = dict((k, max(v - 1, 1)) for k, v in lanes.items()) eff_velos = { e: ( lengths[e] * lanes_mod[e] / (spillover_loads[e] * t_react) - x_veh / t_react ) * 60 * 60 / 1000 for e in Gc.edges(keys=True) } eff_velos = dict( (e, speed_lims[e]) if v > speed_lims[e] else (e, np.round(v, 1)) for e, v in eff_velos.items() ) rr_travel_time = { e: np.round(np.divide(lengths[e] / 1000 * 60 * 60, eff_velos[e]), 1) for e in Gc.edges(keys=True) } nx.set_edge_attributes(Gc, rr_travel_time, "spillover_travel_time") nx.set_edge_attributes(Gc, eff_velos, "spillover_velocity") return Gc def effective_velocities(G, N_road): x_veh = 5 t_react = 2 Gc = peak_loads_and_capacities(G, N_road) peak_loads = nx.get_edge_attributes(Gc, "peak_load") lanes = nx.get_edge_attributes(Gc, "lanes") lengths = nx.get_edge_attributes(Gc, "length") speed_lims = nx.get_edge_attributes(Gc, "speed_kph") lanes_mod = dict((k, max(v - 1, 1)) for k, v in lanes.items()) eff_velos = { e: (lengths[e] * lanes_mod[e] / (peak_loads[e] * t_react) - x_veh / t_react) * 60 * 60 / 1000 for e in Gc.edges(keys=True) } eff_velos = dict( (e, speed_lims[e]) if v > speed_lims[e] else (e, 5) if v < 5 else (e, np.round(v, 1)) for e, v in eff_velos.items() ) travel_time = { e: np.round(np.divide(lengths[e] / 1000 * 60 * 60, eff_velos[e]), 1) for e in Gc.edges(keys=True) } nx.set_edge_attributes(Gc, travel_time, "effective_travel_time") nx.set_edge_attributes(Gc, eff_velos, "effective_velocity") return Gc # %%