minerva/examples/dask_cluster.py

65 lines
2.1 KiB
Python

import minerva
from minerva.timing import Timing
from dask.distributed import Client
import dask
########### PREP ############################
def worker(pier, n):
mach = pier.machine(ami = "ami-0399a4f70ca684620", # dask on ubuntu 22.04 x86
instance_type = "t3.medium",
username = "ubuntu",
name = f"dask-worker-{n}",
variables = {"type": "worker",
"number": n},
disk_size = 512)
return mach
def scheduler(pier):
mach = pier.machine(ami = "ami-0399a4f70ca684620", # dask on ubuntu 22.04 x86
instance_type = "t3.medium",
username = "ubuntu",
disk_size = 32,
name = f"dask-scheduler",
variables = {"type": "scheduler"})
return mach
########## CLUSTER ##########################
m = minerva.Minerva("hay")
pier = m.pier(subnet_id = "subnet-05eb26d8649a093e1", # project-subnet-public1-us-east-1a
sg_groups = ["sg-0f9e555954e863954", # ssh
"sg-0b34a3f7398076545", # default
"sg-04cd2626d91ac093c"], # dask (8786, 8787)
key_pair = ("Ari-Brown-HAY", "~/.ssh/Ari-Brown-HAY.pem"),
iam = "S3+SSM+CloudWatch+ECR")
cluster = pier.cluster(scheduler, worker, num_workers=2)
cluster.start()
########## USAGE ########################
try:
client = Client(cluster.public_location)
print(client)
athena = m.athena("s3://haystac-pmo-athena/")
query = athena.query("select * from trajectories.basline where agent < 100")
df = query.distribute_results(client)
with Timing("persisting"):
dp = df.persist()
with Timing("count()"):
print(dp.count())
with Timing("mean latitude"):
print(dp.latitude.mean().compute())
with Timing("mean longitude"):
print(dp.longitude.mean().compute())
finally:
########## FIN #######################
cluster.terminate()