Skip to content
Snippets Groups Projects
ParallelLoad.py 5.13 KiB
Newer Older
Jonas Wassmer's avatar
Jonas Wassmer committed
# %%
from multiprocess import Pool
import multiprocess
import networkx as nx
import numpy as np
import time
import os
import osmnx as ox
import pickle

from src import GermanMobiltyPanel as gmp


# %%
def parallel_load_with_cache(
    graph,
    cpu_cores=5,
    cache=True,
    jobs_per_cpu=5,
    cutoff="default",
    weight="travel_time",
):
    if cache:
        hash = nx.weisfeiler_lehman_graph_hash(
            ox.get_digraph(graph),
            edge_attr=weight,
            node_attr="population",
            iterations=10,
            digest_size=32,
        )
        path = f"data/cache/load-files/{hash}.pkl"
        if os.path.isfile(path):
            print("Load already computed. Returning saved values.")
            with open(path, "rb") as f:
                load = pickle.load(f)
                nx.set_edge_attributes(graph, load, "load")

        else:
            graph = parallel_load(
                graph,
                cpu_cores=cpu_cores,
                jobs_per_cpu=jobs_per_cpu,
                weight=weight,
                cutoff=cutoff,
            )
            load = nx.get_edge_attributes(graph, "load")
            with open(path, "wb") as f:
                pickle.dump(load, f)
    else:
        graph = parallel_load(
            graph,
            cpu_cores=cpu_cores,
            jobs_per_cpu=jobs_per_cpu,
            weight=weight,
            cutoff=cutoff,
        )
    return graph


def parallel_load(
    graph, cpu_cores=5, jobs_per_cpu=5, cutoff="default", weight="travel_time"
):
    global load_to_orig
    if cutoff == "default":
        if weight == "length":
            cutoff = 60 * 1000  # 60 km
        elif weight == "travel_time":
            cutoff = 60 * 60  # 60 mins

    max_bin, popt_exp, popt_lin = gmp.mobility_fit_params(
        "data/MOP-data/mobility/", "travel_time", bincount=250
    )
    mobility_fit = (
        lambda x: gmp.exp_func(x, *popt_exp)
        if x > max_bin
        else gmp.lin_func(x, *popt_lin)
    )

    population = nx.get_node_attributes(graph, "population")
    zeros = np.zeros(len(graph.edges), dtype=np.float64)

    def load_to_orig(origin_list):
        # print(multiprocessing.current_process())
        loaddict = dict(zip(graph.edges(keys=True), zeros))
        for o in origin_list:
            dist, pathes = nx.single_source_dijkstra(
                graph, o, weight=weight, cutoff=cutoff
            )
            No = population[o]
            denominator = sum(
                mobility_fit(dist) * population[l] for (l, dist) in dist.items()
            )
            for d, path in pathes.items():
                Nd = population[d]
                fod = No * Nd * mobility_fit(dist[d]) / denominator
                for i, j in zip(path[:-1], path[1:]):
                    loaddict[i, j, 0] += fod
        data = np.array(list(loaddict.values()))
        return data

    # cpu_count() will report /all/ CPUs on the node
    # This is not what we should use.
    ncpus = multiprocess.cpu_count()
    print("detected {} cores".format(ncpus))

    # This will report the number of CPU cores SLURM
    # has allocated us. This is the correct number to
    # pass to Pool()
    try:
        ncpus = int(os.environ["SLURM_JOB_CPUS_PER_NODE"])
        # ncpus = int(os.environ.get("SLURM_CPUS_PER_TASK"))
        print("my Slurm allocation is {} cores".format(ncpus))
    except KeyError:
        ncpus = cpu_cores
        print(f"Not running under Slurm, setting ncpus to {ncpus}")

    node_arr = np.array(list(graph.nodes()))
    input_list_len = ncpus * jobs_per_cpu
    vertices_per_proc = int(np.floor(len(node_arr) / input_list_len))

    node_list_proc = node_arr[: input_list_len * vertices_per_proc].reshape(
        input_list_len, vertices_per_proc
    )

    # the last column can be shorter then the rest (will be used first and used for mp eta)
    if len(node_arr[input_list_len * vertices_per_proc :]) > 0:
        nodes_last_col = node_arr[input_list_len * vertices_per_proc :]
    else:
        nodes_last_col = node_list_proc[-1]
    # eta
    start = time.time()
    load_last_col = load_to_orig(nodes_last_col)
    end = time.time()
    est_time = round((end - start) * len(graph.nodes()) / (ncpus * len(nodes_last_col)))
    print(f"Estimated time order: O({est_time} s)")

    # start multiprocessing
    start = time.time()
    with Pool(processes=ncpus) as pool:
        # future = pool.map(parallel_loads, list(G.nodes()))
        future = pool.map_async(load_to_orig, node_list_proc[:-1])
        load_arr = np.sum(future.get(), axis=0)

    nx.set_edge_attributes(
        graph,
        dict(zip(graph.edges(keys=True), load_arr + load_last_col)),
        "load",
    )
    # loadp = nx.get_edge_attributes(G, "parallel_load")
    end = time.time()
    print("Time:", round(end - start, 1), "seconds")
    return graph


# %%
""""
west, east = 6.75, 7.35
south, north = 50.35, 50.65


date = "2021-01-02"
region = rn.RoadNetwork(north, east, south, west, date, "all")
region.loads("travel_time")

H = region.graph
H = parallel_load(H, 40, cutoff=None)
pload = nx.get_edge_attributes(H, "parallel_load")
load = nx.get_edge_attributes(H, "load")

# %%
#plt.scatter(load.values(), pload.values())
# %%
"""