# %% import networkx as nx import osmnx as ox import os import numpy as np import pickle import time import multiprocessing from pathlib import Path from tqdm import tqdm import src.GermanMobiltyPanel as gmp def flows_od(pop_o, pops, dists, mobility_fit): """Returns dict with flows from origin (o) to all destinations (d). {d : f_o} pop_o : population at origin o pops : population dictionary dists : distances from o to destinations """ denominator = sum(mobility_fit(dist) * pops[l] for (l, dist) in dists.items()) return { d: pop_o * pops[d] * mobility_fit(dist) / denominator for (d, dist) in dists.items() } def flows(G, pops, mobility_fit, path_attr, cutoff=None): """Returns generator with flows from all origins to all destinations. (o : {d} : f_o}) G : road graph path_attr : edge weight for shortest path algorithm. """ return ( (flows_od(pops[o], pops, dists, mobility_fit), paths) for o, (dists, paths) in nx.all_pairs_dijkstra( G, cutoff=cutoff, weight=path_attr ) ) def compute_loads_OLD( graph, path_attr="travel_time", disable_tqdm=False, cutoff="default" ): """Return graph object with commuter loads as edge weight attr on all edges.""" G = graph.copy() max_bin, popt_exp, popt_lin = gmp.mobility_fit_params( "data/MOP-data/mobility/", path_attr, bincount=250 ) mobility_fit = ( lambda x: gmp.exp_func(x, *popt_exp) if x > max_bin else gmp.lin_func(x, *popt_lin) ) L = dict(zip(G.edges(keys=True), np.zeros(len(G.edges())))) pops = nx.get_node_attributes(G, "population") if cutoff == "default": if path_attr == "length": cutoff = 60 * 1000 # 60 km elif path_attr == "travel_time": cutoff = 60 * 60 # 60 mins if not disable_tqdm: print("Computing loads...") for fo, paths in tqdm( flows(G, pops, mobility_fit, path_attr, cutoff=cutoff), total=len(G.nodes()), disable=disable_tqdm, ): for d, path in paths.items(): f_od = fo[d] for i, j in zip(path[:-1], path[1:]): L[(i, j, 0)] += f_od nx.set_edge_attributes(G, L, "load") return G def compute_loads(graph, weight="travel_time", cutoff="default", disable_tqdm=False): """Return graph object with commuter loads as edge weight attr on all edges.""" G = graph.copy() max_bin, popt_exp, popt_lin = gmp.mobility_fit_params( "data/MOP-data/mobility/", weight, bincount=250 ) mobility_fit = ( lambda x: gmp.exp_func(x, *popt_exp) if x > max_bin else gmp.lin_func(x, *popt_lin) ) L = dict(zip(G.edges(keys=True), np.zeros(len(G.edges())))) populations = nx.get_node_attributes(G, "population") if cutoff == "default": if weight == "length": cutoff = 60 * 1000 # 60 km elif weight == "travel_time": cutoff = 60 * 60 # 60 mins if not disable_tqdm: print("Computing loads...") for o, (dist, path) in tqdm( nx.all_pairs_dijkstra(G, cutoff=cutoff, weight=weight), total=len(G.nodes()), disable=disable_tqdm, ): No = populations[o] denominator = sum( mobility_fit(dist) * populations[l] for (l, dist) in dist.items() ) for d, path_od in path.items(): Nd = populations[d] dist_od = dist[d] fod = No * Nd * mobility_fit(dist_od) / denominator L.update( { (i, j, 0): L[(i, j, 0)] + fod for i, j in zip(path_od[:-1], path_od[1:]) } ) # for i, j in zip(path_od[:-1], path_od[1:]): # L[(i, j, 0)] += fod nx.set_edge_attributes(G, L, "load") return G def commuter_loads(G, path_attr="travel_time", cache=True, cutoff="default"): """Return graph object with commuter loads as edge weight attr on all edges.""" if cache: hash = nx.weisfeiler_lehman_graph_hash( ox.get_digraph(G), edge_attr=path_attr, node_attr="population", iterations=10, digest_size=32, ) path = f"data/cache/load-files/{hash}.pkl" if os.path.isfile(path): print( "The load has already been calculated and saved. Returning this output." ) with open(path, "rb") as f: load = pickle.load(f) nx.set_edge_attributes(G, load, "load") # return G else: G = compute_loads(G, weight=path_attr, cutoff=cutoff, disable_tqdm=False) load = nx.get_edge_attributes(G, "load") with open(path, "wb") as f: pickle.dump(load, f) # return G else: G = compute_loads(G, weight=path_attr, cutoff=cutoff, disable_tqdm=False) return G