# %% from multiprocess import Pool import multiprocess import networkx as nx import numpy as np import time import os import osmnx as ox import pickle from src import GermanMobiltyPanel as gmp # %% def parallel_load_with_cache( graph, cpu_cores=5, cache=True, jobs_per_cpu=5, cutoff="default", weight="travel_time", ): if cache: hash = nx.weisfeiler_lehman_graph_hash( ox.get_digraph(graph), edge_attr=weight, node_attr="population", iterations=10, digest_size=32, ) path = f"data/cache/load-files/{hash}.pkl" if os.path.isfile(path): print("Load already computed. Returning saved values.") with open(path, "rb") as f: load = pickle.load(f) nx.set_edge_attributes(graph, load, "load") else: graph = parallel_load( graph, cpu_cores=cpu_cores, jobs_per_cpu=jobs_per_cpu, weight=weight, cutoff=cutoff, ) load = nx.get_edge_attributes(graph, "load") with open(path, "wb") as f: pickle.dump(load, f) else: graph = parallel_load( graph, cpu_cores=cpu_cores, jobs_per_cpu=jobs_per_cpu, weight=weight, cutoff=cutoff, ) return graph def parallel_load( graph, cpu_cores=5, jobs_per_cpu=5, cutoff="default", weight="travel_time" ): global load_to_orig if cutoff == "default": if weight == "length": cutoff = 60 * 1000 # 60 km elif weight == "travel_time": cutoff = 60 * 60 # 60 mins max_bin, popt_exp, popt_lin = gmp.mobility_fit_params( "data/MOP-data/mobility/", "travel_time", bincount=250 ) mobility_fit = ( lambda x: gmp.exp_func(x, *popt_exp) if x > max_bin else gmp.lin_func(x, *popt_lin) ) population = nx.get_node_attributes(graph, "population") zeros = np.zeros(len(graph.edges), dtype=np.float64) def load_to_orig(origin_list): # print(multiprocessing.current_process()) loaddict = dict(zip(graph.edges(keys=True), zeros)) for o in origin_list: dist, pathes = nx.single_source_dijkstra( graph, o, weight=weight, cutoff=cutoff ) No = population[o] denominator = sum( mobility_fit(dist) * population[l] for (l, dist) in dist.items() ) for d, path in pathes.items(): Nd = population[d] fod = No * Nd * mobility_fit(dist[d]) / denominator for i, j in zip(path[:-1], path[1:]): loaddict[i, j, 0] += fod data = np.array(list(loaddict.values())) return data # cpu_count() will report /all/ CPUs on the node # This is not what we should use. ncpus = multiprocess.cpu_count() print("detected {} cores".format(ncpus)) # This will report the number of CPU cores SLURM # has allocated us. This is the correct number to # pass to Pool() try: ncpus = int(os.environ["SLURM_JOB_CPUS_PER_NODE"]) # ncpus = int(os.environ.get("SLURM_CPUS_PER_TASK")) print("my Slurm allocation is {} cores".format(ncpus)) except KeyError: ncpus = cpu_cores print(f"Not running under Slurm, setting ncpus to {ncpus}") node_arr = np.array(list(graph.nodes())) input_list_len = ncpus * jobs_per_cpu vertices_per_proc = int(np.floor(len(node_arr) / input_list_len)) node_list_proc = node_arr[: input_list_len * vertices_per_proc].reshape( input_list_len, vertices_per_proc ) # the last column can be shorter then the rest (will be used first and used for mp eta) if len(node_arr[input_list_len * vertices_per_proc :]) > 0: nodes_last_col = node_arr[input_list_len * vertices_per_proc :] else: nodes_last_col = node_list_proc[-1] # eta start = time.time() load_last_col = load_to_orig(nodes_last_col) end = time.time() est_time = round((end - start) * len(graph.nodes()) / (ncpus * len(nodes_last_col))) print(f"Estimated time order: O({est_time} s)") # start multiprocessing start = time.time() with Pool(processes=ncpus) as pool: # future = pool.map(parallel_loads, list(G.nodes())) future = pool.map_async(load_to_orig, node_list_proc[:-1]) load_arr = np.sum(future.get(), axis=0) nx.set_edge_attributes( graph, dict(zip(graph.edges(keys=True), load_arr + load_last_col)), "load", ) # loadp = nx.get_edge_attributes(G, "parallel_load") end = time.time() print("Time:", round(end - start, 1), "seconds") return graph # %% """" west, east = 6.75, 7.35 south, north = 50.35, 50.65 date = "2021-01-02" region = rn.RoadNetwork(north, east, south, west, date, "all") region.loads("travel_time") H = region.graph H = parallel_load(H, 40, cutoff=None) pload = nx.get_edge_attributes(H, "parallel_load") load = nx.get_edge_attributes(H, "load") # %% #plt.scatter(load.values(), pload.values()) # %% """