import sys import minerva from minerva.timing import Timing import dask import dask.dataframe as dd import time #dask.config.set({'distributed.worker.multiprocessing-method': 'fork'}) import dask.distributed from dask.distributed import Client m = minerva.Minerva("hay") print(f"connecting to {sys.argv[1]}") client = Client(sys.argv[1]) manifest_files = ['s3://ari-public-test-data/test1'] try: with Timing("read parquets"): df = dd.read_parquet(manifest_files, engine='fastparquet') with Timing("partitioning"): divisions = list(range(0, 10001)) df = df.set_index('agent', divisions=divisions) with Timing("persisting"): dp = df.persist() with Timing("total memory usage"): print(dp.memory_usage().compute()) with Timing("partition usage"): print(dp.get_partition(300).memory_usage().compute()) with Timing("count()"): print(dp.count().compute()) with Timing("memory usage"): print(dp.memory_usage().compute()) with Timing("count()"): print(dp.count().compute()) finally: ########## FIN ####################### print("closing client") client.close()