updated dask cluster example

This commit is contained in:
Ari Brown 2023-10-12 14:59:24 -04:00
parent c2a702fb9d
commit b6b4b4b416

View file

@ -1,20 +1,23 @@
from minerva.cluster import Cluster from minerva.cluster import Cluster
from minerva.pier import Pier from minerva.pier import Pier
from minerva.timing import Timing
from dask.distributed import Client from dask.distributed import Client
import dask
########### PREP ############################ ########### PREP ############################
def worker(pier, n): def worker(pier, n):
mach = pier.machine(ami = "ami-05a242924e713f80a", # dask on ubuntu 22.04 x86 mach = pier.machine(ami = "ami-01f85b935dc9f674c", # dask on ubuntu 22.04 x86
instance_type = "t3.medium", instance_type = "t3.medium",
username = "ubuntu", username = "ubuntu",
name = f"dask-worker-{n}", name = f"dask-worker-{n}",
variables = {"type": "worker", variables = {"type": "worker",
"number": n}) "number": n},
disk_size = 512)
return mach return mach
def scheduler(pier): def scheduler(pier):
mach = pier.machine(ami = "ami-05a242924e713f80a", # dask on ubuntu 22.04 x86 mach = pier.machine(ami = "ami-01f85b935dc9f674c", # dask on ubuntu 22.04 x86
instance_type = "t3.medium", instance_type = "t3.medium",
username = "ubuntu", username = "ubuntu",
name = f"dask-scheduler", name = f"dask-scheduler",
@ -27,31 +30,56 @@ pier = Pier("hay",
subnet_id = "subnet-05eb26d8649a093e1", # project-subnet-public1-us-east-1a subnet_id = "subnet-05eb26d8649a093e1", # project-subnet-public1-us-east-1a
sg_groups = ["sg-0f9e555954e863954", # ssh sg_groups = ["sg-0f9e555954e863954", # ssh
"sg-0b34a3f7398076545", # default "sg-0b34a3f7398076545", # default
"sg-04cd2626d91ac093c"], # dask (8786, 8787, ping) "sg-04cd2626d91ac093c"], # dask (8786, 8787)
iam = "S3+SSM+CloudWatch+ECR", key_pair = ("Ari-Brown-HAY", "~/.ssh/Ari-Brown-HAY.pem"),
key_pair = ("Ari-Brown-HAY", "~/.ssh/Ari-Brown-HAY.pem")) iam = "S3+SSM+CloudWatch+ECR")
cluster = Cluster(pier, scheduler, worker, num_workers=1) cluster = Cluster(pier, scheduler, worker, num_workers=5)
cluster.start() cluster.start()
client = Client(cluster.private_location)
print(client)
########## USAGE ######################## ########## USAGE ########################
try: try:
client = Client(cluster.public_location)
print(client)
# Practice with a big array # Practice with a big array
import numpy as np # https://matthewrocklin.com/blog/work/2017/01/12/dask-dataframes
import dask.array as da
large_array = np.random.rand(10000, 10000) #import numpy as np
print("oh boy") #import dask.array as da
dask_array = da.from_array(large_array, chunks=(100, 100)) import dask.dataframe as dd
dask_array = dask_array.persist() # non-blocking import time
mean = dask_array.mean().compute() # https://stackoverflow.com/questions/43796774/loading-local-file-from-client-onto-dask-distributed-cluster
print(mean)
# Iteratively load files and scatter them to the cluster
#
# futures = []
# for fn in filenames:
# df = pd.read_csv(fn)
# future = client.scatter(df)
# futures.append(future)
#
# ddf = dd.from_delayed(futures, meta=df)
# query = athena.query("select * from trajectories")
# ddf = query.distribute_results(client)
with Timing("reading parquet"):
df = dd.read_parquet("s3://haystac-archive-phase1.trial1/ta1.kitware/ta1/simulation/train/")
#with Timing("persisting"):
# dp = df.persist()
#with Timing("count()"):
# print(dp.count())
#with Timing("mean latitude"):
# print(dp.latitude.mean().compute())
#with Timing("mean longitude"):
# print(dp.longitude.mean().compute())
finally: finally:
########## FIN ####################### ########## FIN #######################