From 210e7ebd92ced4a65bd51639dcc931c856613e26 Mon Sep 17 00:00:00 2001 From: Ari Brown Date: Tue, 28 Nov 2023 11:12:40 -0500 Subject: [PATCH] improved run_cluster.py; touching up the dask test; upgrade pyarrow dependency for security patch --- dask_test.py | 110 +++++++++++++++++++++++++++++++++++-------------- pyproject.toml | 4 +- run_cluster.py | 25 +++++++---- 3 files changed, 99 insertions(+), 40 deletions(-) diff --git a/dask_test.py b/dask_test.py index 2e2d938..d1a2a9c 100644 --- a/dask_test.py +++ b/dask_test.py @@ -2,64 +2,112 @@ import sys import minerva from minerva.timing import Timing import dask +import dask.distributed from dask.distributed import Client +import dask.dataframe as dd +import time m = minerva.Minerva("hay") print(f"connecting to {sys.argv[1]}") client = Client(sys.argv[1]) +#client = Client() + +manifest_files = ['s3://haystac-pmo-athena/results/0.12892323498922598/20231127_175506_00007_3735a_0934fdc9-1bc7-4dae-a371-c3f58f3b31fc', + 's3://haystac-pmo-athena/results/0.12892323498922598/20231127_175506_00007_3735a_7011b49a-bf6c-42d0-9852-85aa10a3ad37', + 's3://haystac-pmo-athena/results/0.12892323498922598/20231127_175506_00007_3735a_8ca98d70-74c2-4d17-b059-83cd25e398c0', + 's3://haystac-pmo-athena/results/0.12892323498922598/20231127_175506_00007_3735a_61265893-e992-4026-8b69-e60a4641dd10', + 's3://haystac-pmo-athena/results/0.12892323498922598/20231127_175506_00007_3735a_d4318a64-7fc0-4b59-8f85-8b1790e72a70', + 's3://haystac-pmo-athena/results/0.12892323498922598/20231127_175506_00007_3735a_4e2e19a8-b360-49fe-9b82-f66a1e23ad3e', + 's3://haystac-pmo-athena/results/0.12892323498922598/20231127_175506_00007_3735a_c5a77760-f078-432c-aaf5-6d99fb0cee0c', + 's3://haystac-pmo-athena/results/0.12892323498922598/20231127_175506_00007_3735a_db5f90f3-9fc0-4b86-98da-d840bb9b5423', + 's3://haystac-pmo-athena/results/0.12892323498922598/20231127_175506_00007_3735a_2472c560-1113-448b-ae18-e23032d3f3d8', + 's3://haystac-pmo-athena/results/0.12892323498922598/20231127_175506_00007_3735a_551944ef-47f4-475d-a575-e209ca1ee7b4', + 's3://haystac-pmo-athena/results/0.12892323498922598/20231127_175506_00007_3735a_67569166-18f9-40bd-9845-20f069f8dc8a', + 's3://haystac-pmo-athena/results/0.12892323498922598/20231127_175506_00007_3735a_8a81dca0-767d-43bf-b0aa-a087f98d41c5', + 's3://haystac-pmo-athena/results/0.12892323498922598/20231127_175506_00007_3735a_c9dd6a02-0f92-4202-ba70-77fda02d4acf', + 's3://haystac-pmo-athena/results/0.12892323498922598/20231127_175506_00007_3735a_9f094026-03f4-4ec2-a348-0498eeb6b04d', + 's3://haystac-pmo-athena/results/0.12892323498922598/20231127_175506_00007_3735a_2c20c942-1e2c-4f37-86ec-4adce83edbea', + 's3://haystac-pmo-athena/results/0.12892323498922598/20231127_175506_00007_3735a_237d6c26-8372-4584-ae15-9f693d2295a6', + 's3://haystac-pmo-athena/results/0.12892323498922598/20231127_175506_00007_3735a_cf41cbf5-eb46-4bb9-b904-f1518affbefa', + 's3://haystac-pmo-athena/results/0.12892323498922598/20231127_175506_00007_3735a_420f8434-5a71-4af8-91b5-d5364c941ded', + 's3://haystac-pmo-athena/results/0.12892323498922598/20231127_175506_00007_3735a_f155616d-39bc-483e-b19a-56da9fbae685', + 's3://haystac-pmo-athena/results/0.12892323498922598/20231127_175506_00007_3735a_ef8a7770-43b7-44bb-9dde-f92ed8faae9b', + 's3://haystac-pmo-athena/results/0.12892323498922598/20231127_175506_00007_3735a_8aa7de20-91ee-4ada-8f48-c7a5b313572f', + 's3://haystac-pmo-athena/results/0.12892323498922598/20231127_175506_00007_3735a_7c265580-ada5-4b94-b818-5cebdb4bb6c6', + 's3://haystac-pmo-athena/results/0.12892323498922598/20231127_175506_00007_3735a_13562caf-4578-4e36-83fe-8b7a5eabc7e8', + 's3://haystac-pmo-athena/results/0.12892323498922598/20231127_175506_00007_3735a_b0f80cd3-4aa5-40ac-a0c8-632c763f8969', + 's3://haystac-pmo-athena/results/0.12892323498922598/20231127_175506_00007_3735a_2f1e190b-621c-4638-91f1-e961ba674714', + 's3://haystac-pmo-athena/results/0.12892323498922598/20231127_175506_00007_3735a_5dcd43e4-67bd-41ba-b3cd-7e8f8da9b1f9', + 's3://haystac-pmo-athena/results/0.12892323498922598/20231127_175506_00007_3735a_293c6916-4f22-4ca0-a240-078ebe48368b', + 's3://haystac-pmo-athena/results/0.12892323498922598/20231127_175506_00007_3735a_80e8c47e-f254-47c4-80af-d744885e8174', + 's3://haystac-pmo-athena/results/0.12892323498922598/20231127_175506_00007_3735a_6b1ba112-0545-4c3f-9321-dbf597d98bc1', + 's3://haystac-pmo-athena/results/0.12892323498922598/20231127_175506_00007_3735a_0b3dd1a8-72ef-4852-bf1d-f7091629d3b6'] + +def full_s3(obj): + return "s3://" + obj.bucket_name + "/" + obj.key try: - # Practice with a big array - # https://matthewrocklin.com/blog/work/2017/01/12/dask-dataframes - - #import numpy as np - #import dask.array as da - import dask.dataframe as dd - import time - - # https://stackoverflow.com/questions/43796774/loading-local-file-from-client-onto-dask-distributed-cluster - - athena = m.athena("s3://haystac-pmo-athena/") + athena = m.athena("s3://haystac-pmo-athena/") + records = 1_000_000_000 # Iteratively load files and scatter them to the cluster - with Timing("athena query"): - query = athena.execute("select * from trajectories.kitware limit 1000000", format='csv') - query.finish() - df = dd.read_csv(query.info_cache['ResultConfiguration']['OutputLocation']) + with Timing("read parquet from athena"): + # Delta Table + #files = m.s3.ls("s3://haystac-archive-phase1.trial1/ta1.kitware/ta1/simulation/train/") + #parqs = [full_s3(o) for o in files if o.key.endswith('.parquet')] - #query = athena.query("select * from trajectories.kitware limit 100000000") - #df = dd.read_parquet(query.manifest_files()) - #df = query.distribute_results(client, size=10000) + # 20 partitions at a time, concat 20 at a time + #df = dd.read_parquet(parqs[0:20])#.astype({"agent": "category", + # # "timestamp": "datetime64[ns, UTC]"}) + #for i in range(20, len(parqs), 20): + # new = dd.read_parquet(parqs[i:i + 20])#.astype({"agent": "category", + # # "timestamp": "datetime64[ns, UTC]"}) + # df = df.concat(new) + + # 20 partitions at a time, concat all at once + #df = dd.concat([dd.read_parquet(parqs[i:i + 20], engine='fastparquet') for i in range(0, len(parqs), 20)]) + + # All partitions at once + #df = dd.read_parquet(parqs, engine='fastparquet') + + # Single CSV file + #query = athena.execute(f"select * from trajectories.kitware limit {records}", format="csv") + #query.finish() + #df = dd.read_csv(query.info_cache['ResultConfiguration']['OutputLocation']) + + # Unload to Parquet files + #query = athena.query(f"select * from trajectories.kitware limit {records}") + #df = dd.read_parquet(query.manifest_files()) + df = dd.read_parquet(manifest_files) print("distributed") with Timing("partitioning"): - #df = df.categorize('agent') - #df['agent'] = df['agent'].cat.as_ordered() - #partitions = df.describe(include=['category']).compute().iloc[0][0] - #df = df.set_index('agent', npartitions=partitions) - divisions = list(range(0, 10001)) df = df.set_index('agent', divisions=divisions) with Timing("persisting"): dp = df.persist() - import IPython - IPython.embed() + #import IPython + #IPython.embed() with Timing("count()"): - print(dp.count().visualize(filename='count.svg')) + print(dp.count().compute()) with Timing("mean latitude"): - print(dp.groupby(dp.index).latitude.mean().visualize(filename='latitude.svg')) + print(dp.groupby(dp.index).latitude.mean().compute()) - with Timing("mean longitude"): - print(dp.groupby(dp.index).longitude.mean().visualize(filename='longitude.svg')) + with Timing("count()"): + print(dp.count().compute()) + + #with Timing("mean longitude"): + # print(dp.groupby(dp.index).longitude.mean().compute()) finally: ########## FIN ####################### + print("closing client") client.close() - print("end") - input() + #print("end") + #input() diff --git a/pyproject.toml b/pyproject.toml index 6db34e9..f0086c3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -18,7 +18,7 @@ minerva-console = "minerva.console:main" [tool.poetry.dependencies] python = ">3.9, <3.12" boto3 = "^1.26.161" -pyarrow = "^12.0.1" +pyarrow = "^14.0.1" joblib = "^1.1.0" fabric = "^3.0.0" -s3fs = "2023.9.2" +s3fs = "2023.6.0" diff --git a/run_cluster.py b/run_cluster.py index 164848f..848fe08 100644 --- a/run_cluster.py +++ b/run_cluster.py @@ -4,9 +4,12 @@ from minerva.pier import Pier ########### PREP ############################ +# modin base (built on dask base) +DASK_BASE = "ami-0a8b05658a4f00e52" # dask on ubuntu 22.04 x86 + def worker(pier, n): - mach = pier.machine(ami = "ami-0a3df8365ed416816", # dask on ubuntu 22.04 x86 - instance_type = "t3.medium", + mach = pier.machine(ami = DASK_BASE, + instance_type = "r5.xlarge", username = "ubuntu", name = f"dask-worker-{n}", variables = {"type": "worker", @@ -15,8 +18,8 @@ def worker(pier, n): return mach def scheduler(pier): - mach = pier.machine(ami = "ami-0a3df8365ed416816", # dask on ubuntu 22.04 x86 - instance_type = "t3.medium", + mach = pier.machine(ami = DASK_BASE, + instance_type = "r5.xlarge", username = "ubuntu", name = f"dask-scheduler", variables = {"type": "scheduler"}, @@ -35,9 +38,17 @@ pier = m.pier(subnet_id = "subnet-05eb26d8649a093e1", # project-subnet-public cluster = pier.cluster(scheduler, worker, num_workers=int(sys.argv[1])) cluster.start() -print(cluster.public_location) -print("press enter to terminate cluster") -input() +print() +print(f"dashboard: http://{cluster.scheduler.public_ip}:8787/") +print(f"cluster: {cluster.public_location}") +print() + +print("type `exit()` to terminate the cluster") +print() + +import IPython +IPython.embed() + cluster.terminate()