Source code for running.history_runner

import datetime
import json
import logging
import multiprocessing as mp
import os

from base.cgraph import MyGraph
from tqdm import tqdm

from crawlers.cbasic import Crawler
from crawlers.declarable import declaration_to_filename, filename_to_declaration
from graph_io import GraphCollections
from running.knapsack.jobs_balancer import GreedyBalancer
from running.knapsack.jobs_runner import JobProcess, JobsRunner
from running.merger import ResultsMerger
from running.metrics import Metric
from running.runner import CrawlerRunner


[docs]class SmartCrawlersRunner: """ Runs several crawlers and measures several metrics for a given graph. Saves measurements history to the disk. Can run several instances of a crawler in parallel. Step sequence is exponentially growing independent of the graph. """
[docs] def __init__(self, graph_full_names, crawler_decls, metric_decls, budget: int = -1): self.graph_full_names = graph_full_names self.crawler_decls = crawler_decls self.metric_decls = metric_decls self.budget = budget
def run_adaptive(self, n_instances=10, max_cpus: int=mp.cpu_count(), max_memory: float=0.7 * os.sysconf('SC_PAGE_SIZE') * os.sysconf('SC_PHYS_PAGES') / (1024. ** 3)): fn_decl = {} # filename -> original crawler/metric declaration for cd in self.crawler_decls: c = Crawler.from_declaration(cd, graph=None) f = declaration_to_filename(c.declaration) fn_decl[f] = cd for md in self.metric_decls: m = Metric.from_declaration(md, graph=None) f = declaration_to_filename(m.declaration) fn_decl[f] = md # Get missing combinations crm = ResultsMerger(self.graph_full_names, self.crawler_decls, self.metric_decls, self.budget, n_instances=n_instances) missing = crm.missing_instances() # g -> c -> m -> count if len(missing) == 0: logging.info("No missing experiments found") return print(f"Missing configs to be run:") total_runs = 0 for g, cmc in missing.items(): for cd, mc in cmc.items(): count = max(mc.values()) ms = list(mc.keys()) print(f"[{count}] {g}, {cd}, {ms}") total_runs += count cmc[cd] = (ms, count) # Now g -> c -> ([metrics], count) # Create jobs def run(graph_full_name=None, crawler_fn=None, metric_fns=None, **kwargs): g = GraphCollections.get(*graph_full_name) mds = [fn_decl[m] for m in metric_fns] cds = [fn_decl[crawler_fn]] chr = CrawlerHistoryRunner(g, cds, mds, self.budget) chr.run() jobs = [] for g, cmc in missing.items(): for crawler, (metrics, count) in cmc.items(): cpu = mp.cpu_count() / 2 if 'GNNPredictor' in crawler else 1 ram = 1400 # TODO estimate! # job_id = ','.join([str(g), c, str(ms)]) job_id = None for _ in range(count): jobs.append(JobProcess( cpu=cpu, ram=ram, id=job_id, target=run, kwargs={ 'graph_full_name': g, 'crawler_fn': crawler, 'metric_fns': metrics, 'budget': self.budget})) runner = JobsRunner(max_cpus, max_memory * 1024, GreedyBalancer) runner.run(jobs) print(runner.history)
class CrawlerHistoryRunner(CrawlerRunner): """ Runs several crawlers and measures several metrics for a given graph. Saves measurements history to the disk. Can run several instances of a crawler in parallel. Step sequence is exponentially growing independent of the graph. Functions: ---------- * run - Run given crawlers and measure metrics. In the end, the measurements are saved to files. NOTES: ------ * step sequence when metrics are computed must be the same for all runs. Otherwise the results would not be merged correctly """ def __init__(self, graph: MyGraph, crawler_decls, metric_decls, budget: int = -1, step: int = -1, omit_nones=True): """ :param graph: graph to run :param crawler_decls: list of crawler declarations to run. Crawler declarations will be initialized when run() is called :param metric_decls: list of metric declarations to compute at each step. Metric should be callable function crawler -> float, and have name :param budget: maximal number of nodes to be crawled, by default the whole graph :param step: compute metrics each `step` steps, by default exponential step :param omit_nones: If False, metric values where all values are None will not be saved. By default save all metric values. """ super().__init__(graph, crawler_decls=crawler_decls, metric_decls=metric_decls, budget=budget, step=step) self._init_semaphore = mp.Semaphore(1) self._save_semaphore = mp.Semaphore(1) self._omit_nones = omit_nones def _save_history(self, crawler_metric_seq, step_seq): pbar = tqdm(total=len(crawler_metric_seq), desc='Saving history') for crawler in crawler_metric_seq.keys(): metric_seq = crawler_metric_seq[crawler] for metric in metric_seq.keys(): metric_value = dict(zip(step_seq, metric_seq[metric])) # If all values are None, omit saving if self._omit_nones and all(v is None for v in metric_value.values()): continue path_pattern = ResultsMerger.names_to_path( self.graph.full_name, declaration_to_filename(crawler.declaration), declaration_to_filename(metric.declaration), self.budget) directory = os.path.dirname(path_pattern) if not os.path.exists(directory): os.makedirs(directory) path = ResultsMerger.next_file(path_pattern.parent) with open(path, 'w') as f: # NOTE: will raise error for numpy values json.dump(metric_value, f, indent=1) pbar.update(1) pbar.close() def run(self): """ Run crawlers and measure metrics. In the end, the measurements are saved. Note, this function can be called in parallel processes. """ with self._init_semaphore: # To ensure stats reading/calculation only once in case of parallel run crawlers, metrics, batch_generator = self._init_runner() pbar = tqdm(total=self.budget, desc='Running iterations', position=0, leave=True) step = 0 step_seq = [0] # starting for point 0 crawler_metric_seq = dict([(c, dict([(m, [0]) for m in metrics])) for c in crawlers]) for batch in batch_generator: step += batch step_seq.append(step) for crawler in crawlers: crawler.crawl_budget(batch) for metric in metrics: # Calculate metric for crawler value = metric(crawler) crawler_metric_seq[crawler][metric].append(value) pbar.update(batch) pbar.close() # closing progress bar with self._save_semaphore: # When saving history, it needs to know the number of files in the directory self._save_history(crawler_metric_seq, step_seq) logging.info("Finished running at %s" % (datetime.datetime.now()))