improved run_cluster.py; touching up the dask test; upgrade pyarrow dependency for security patch

This commit is contained in:
Ari Brown 2023-11-28 11:12:40 -05:00
parent c6280a3826
commit 210e7ebd92
3 changed files with 99 additions and 40 deletions

View file

@ -2,64 +2,112 @@ import sys
import minerva import minerva
from minerva.timing import Timing from minerva.timing import Timing
import dask import dask
import dask.distributed
from dask.distributed import Client from dask.distributed import Client
import dask.dataframe as dd
import time
m = minerva.Minerva("hay") m = minerva.Minerva("hay")
print(f"connecting to {sys.argv[1]}") print(f"connecting to {sys.argv[1]}")
client = Client(sys.argv[1]) client = Client(sys.argv[1])
#client = Client()
manifest_files = ['s3://haystac-pmo-athena/results/0.12892323498922598/20231127_175506_00007_3735a_0934fdc9-1bc7-4dae-a371-c3f58f3b31fc',
's3://haystac-pmo-athena/results/0.12892323498922598/20231127_175506_00007_3735a_7011b49a-bf6c-42d0-9852-85aa10a3ad37',
's3://haystac-pmo-athena/results/0.12892323498922598/20231127_175506_00007_3735a_8ca98d70-74c2-4d17-b059-83cd25e398c0',
's3://haystac-pmo-athena/results/0.12892323498922598/20231127_175506_00007_3735a_61265893-e992-4026-8b69-e60a4641dd10',
's3://haystac-pmo-athena/results/0.12892323498922598/20231127_175506_00007_3735a_d4318a64-7fc0-4b59-8f85-8b1790e72a70',
's3://haystac-pmo-athena/results/0.12892323498922598/20231127_175506_00007_3735a_4e2e19a8-b360-49fe-9b82-f66a1e23ad3e',
's3://haystac-pmo-athena/results/0.12892323498922598/20231127_175506_00007_3735a_c5a77760-f078-432c-aaf5-6d99fb0cee0c',
's3://haystac-pmo-athena/results/0.12892323498922598/20231127_175506_00007_3735a_db5f90f3-9fc0-4b86-98da-d840bb9b5423',
's3://haystac-pmo-athena/results/0.12892323498922598/20231127_175506_00007_3735a_2472c560-1113-448b-ae18-e23032d3f3d8',
's3://haystac-pmo-athena/results/0.12892323498922598/20231127_175506_00007_3735a_551944ef-47f4-475d-a575-e209ca1ee7b4',
's3://haystac-pmo-athena/results/0.12892323498922598/20231127_175506_00007_3735a_67569166-18f9-40bd-9845-20f069f8dc8a',
's3://haystac-pmo-athena/results/0.12892323498922598/20231127_175506_00007_3735a_8a81dca0-767d-43bf-b0aa-a087f98d41c5',
's3://haystac-pmo-athena/results/0.12892323498922598/20231127_175506_00007_3735a_c9dd6a02-0f92-4202-ba70-77fda02d4acf',
's3://haystac-pmo-athena/results/0.12892323498922598/20231127_175506_00007_3735a_9f094026-03f4-4ec2-a348-0498eeb6b04d',
's3://haystac-pmo-athena/results/0.12892323498922598/20231127_175506_00007_3735a_2c20c942-1e2c-4f37-86ec-4adce83edbea',
's3://haystac-pmo-athena/results/0.12892323498922598/20231127_175506_00007_3735a_237d6c26-8372-4584-ae15-9f693d2295a6',
's3://haystac-pmo-athena/results/0.12892323498922598/20231127_175506_00007_3735a_cf41cbf5-eb46-4bb9-b904-f1518affbefa',
's3://haystac-pmo-athena/results/0.12892323498922598/20231127_175506_00007_3735a_420f8434-5a71-4af8-91b5-d5364c941ded',
's3://haystac-pmo-athena/results/0.12892323498922598/20231127_175506_00007_3735a_f155616d-39bc-483e-b19a-56da9fbae685',
's3://haystac-pmo-athena/results/0.12892323498922598/20231127_175506_00007_3735a_ef8a7770-43b7-44bb-9dde-f92ed8faae9b',
's3://haystac-pmo-athena/results/0.12892323498922598/20231127_175506_00007_3735a_8aa7de20-91ee-4ada-8f48-c7a5b313572f',
's3://haystac-pmo-athena/results/0.12892323498922598/20231127_175506_00007_3735a_7c265580-ada5-4b94-b818-5cebdb4bb6c6',
's3://haystac-pmo-athena/results/0.12892323498922598/20231127_175506_00007_3735a_13562caf-4578-4e36-83fe-8b7a5eabc7e8',
's3://haystac-pmo-athena/results/0.12892323498922598/20231127_175506_00007_3735a_b0f80cd3-4aa5-40ac-a0c8-632c763f8969',
's3://haystac-pmo-athena/results/0.12892323498922598/20231127_175506_00007_3735a_2f1e190b-621c-4638-91f1-e961ba674714',
's3://haystac-pmo-athena/results/0.12892323498922598/20231127_175506_00007_3735a_5dcd43e4-67bd-41ba-b3cd-7e8f8da9b1f9',
's3://haystac-pmo-athena/results/0.12892323498922598/20231127_175506_00007_3735a_293c6916-4f22-4ca0-a240-078ebe48368b',
's3://haystac-pmo-athena/results/0.12892323498922598/20231127_175506_00007_3735a_80e8c47e-f254-47c4-80af-d744885e8174',
's3://haystac-pmo-athena/results/0.12892323498922598/20231127_175506_00007_3735a_6b1ba112-0545-4c3f-9321-dbf597d98bc1',
's3://haystac-pmo-athena/results/0.12892323498922598/20231127_175506_00007_3735a_0b3dd1a8-72ef-4852-bf1d-f7091629d3b6']
def full_s3(obj):
return "s3://" + obj.bucket_name + "/" + obj.key
try: try:
# Practice with a big array
# https://matthewrocklin.com/blog/work/2017/01/12/dask-dataframes
#import numpy as np
#import dask.array as da
import dask.dataframe as dd
import time
# https://stackoverflow.com/questions/43796774/loading-local-file-from-client-onto-dask-distributed-cluster
athena = m.athena("s3://haystac-pmo-athena/") athena = m.athena("s3://haystac-pmo-athena/")
records = 1_000_000_000
# Iteratively load files and scatter them to the cluster # Iteratively load files and scatter them to the cluster
with Timing("athena query"): with Timing("read parquet from athena"):
query = athena.execute("select * from trajectories.kitware limit 1000000", format='csv') # Delta Table
query.finish() #files = m.s3.ls("s3://haystac-archive-phase1.trial1/ta1.kitware/ta1/simulation/train/")
df = dd.read_csv(query.info_cache['ResultConfiguration']['OutputLocation']) #parqs = [full_s3(o) for o in files if o.key.endswith('.parquet')]
#query = athena.query("select * from trajectories.kitware limit 100000000") # 20 partitions at a time, concat 20 at a time
#df = dd.read_parquet(parqs[0:20])#.astype({"agent": "category",
# # "timestamp": "datetime64[ns, UTC]"})
#for i in range(20, len(parqs), 20):
# new = dd.read_parquet(parqs[i:i + 20])#.astype({"agent": "category",
# # "timestamp": "datetime64[ns, UTC]"})
# df = df.concat(new)
# 20 partitions at a time, concat all at once
#df = dd.concat([dd.read_parquet(parqs[i:i + 20], engine='fastparquet') for i in range(0, len(parqs), 20)])
# All partitions at once
#df = dd.read_parquet(parqs, engine='fastparquet')
# Single CSV file
#query = athena.execute(f"select * from trajectories.kitware limit {records}", format="csv")
#query.finish()
#df = dd.read_csv(query.info_cache['ResultConfiguration']['OutputLocation'])
# Unload to Parquet files
#query = athena.query(f"select * from trajectories.kitware limit {records}")
#df = dd.read_parquet(query.manifest_files()) #df = dd.read_parquet(query.manifest_files())
#df = query.distribute_results(client, size=10000) df = dd.read_parquet(manifest_files)
print("distributed") print("distributed")
with Timing("partitioning"): with Timing("partitioning"):
#df = df.categorize('agent')
#df['agent'] = df['agent'].cat.as_ordered()
#partitions = df.describe(include=['category']).compute().iloc[0][0]
#df = df.set_index('agent', npartitions=partitions)
divisions = list(range(0, 10001)) divisions = list(range(0, 10001))
df = df.set_index('agent', divisions=divisions) df = df.set_index('agent', divisions=divisions)
with Timing("persisting"): with Timing("persisting"):
dp = df.persist() dp = df.persist()
import IPython #import IPython
IPython.embed() #IPython.embed()
with Timing("count()"): with Timing("count()"):
print(dp.count().visualize(filename='count.svg')) print(dp.count().compute())
with Timing("mean latitude"): with Timing("mean latitude"):
print(dp.groupby(dp.index).latitude.mean().visualize(filename='latitude.svg')) print(dp.groupby(dp.index).latitude.mean().compute())
with Timing("mean longitude"): with Timing("count()"):
print(dp.groupby(dp.index).longitude.mean().visualize(filename='longitude.svg')) print(dp.count().compute())
#with Timing("mean longitude"):
# print(dp.groupby(dp.index).longitude.mean().compute())
finally: finally:
########## FIN ####################### ########## FIN #######################
print("closing client")
client.close() client.close()
print("end") #print("end")
input() #input()

View file

@ -18,7 +18,7 @@ minerva-console = "minerva.console:main"
[tool.poetry.dependencies] [tool.poetry.dependencies]
python = ">3.9, <3.12" python = ">3.9, <3.12"
boto3 = "^1.26.161" boto3 = "^1.26.161"
pyarrow = "^12.0.1" pyarrow = "^14.0.1"
joblib = "^1.1.0" joblib = "^1.1.0"
fabric = "^3.0.0" fabric = "^3.0.0"
s3fs = "2023.9.2" s3fs = "2023.6.0"

View file

@ -4,9 +4,12 @@ from minerva.pier import Pier
########### PREP ############################ ########### PREP ############################
# modin base (built on dask base)
DASK_BASE = "ami-0a8b05658a4f00e52" # dask on ubuntu 22.04 x86
def worker(pier, n): def worker(pier, n):
mach = pier.machine(ami = "ami-0a3df8365ed416816", # dask on ubuntu 22.04 x86 mach = pier.machine(ami = DASK_BASE,
instance_type = "t3.medium", instance_type = "r5.xlarge",
username = "ubuntu", username = "ubuntu",
name = f"dask-worker-{n}", name = f"dask-worker-{n}",
variables = {"type": "worker", variables = {"type": "worker",
@ -15,8 +18,8 @@ def worker(pier, n):
return mach return mach
def scheduler(pier): def scheduler(pier):
mach = pier.machine(ami = "ami-0a3df8365ed416816", # dask on ubuntu 22.04 x86 mach = pier.machine(ami = DASK_BASE,
instance_type = "t3.medium", instance_type = "r5.xlarge",
username = "ubuntu", username = "ubuntu",
name = f"dask-scheduler", name = f"dask-scheduler",
variables = {"type": "scheduler"}, variables = {"type": "scheduler"},
@ -35,9 +38,17 @@ pier = m.pier(subnet_id = "subnet-05eb26d8649a093e1", # project-subnet-public
cluster = pier.cluster(scheduler, worker, num_workers=int(sys.argv[1])) cluster = pier.cluster(scheduler, worker, num_workers=int(sys.argv[1]))
cluster.start() cluster.start()
print(cluster.public_location)
print("press enter to terminate cluster") print()
input() print(f"dashboard: http://{cluster.scheduler.public_ip}:8787/")
print(f"cluster: {cluster.public_location}")
print()
print("type `exit()` to terminate the cluster")
print()
import IPython
IPython.embed()
cluster.terminate() cluster.terminate()