import minerva from minerva.timing import Timing from dask.distributed import Client import dask ########### PREP ############################ def worker(pier, n): mach = pier.machine(ami = "ami-0399a4f70ca684620", # dask on ubuntu 22.04 x86 instance_type = "t3.medium", username = "ubuntu", name = f"dask-worker-{n}", variables = {"type": "worker", "number": n}, disk_size = 512) return mach def scheduler(pier): mach = pier.machine(ami = "ami-0399a4f70ca684620", # dask on ubuntu 22.04 x86 instance_type = "t3.medium", username = "ubuntu", disk_size = 32, name = f"dask-scheduler", variables = {"type": "scheduler"}) return mach ########## CLUSTER ########################## m = minerva.Minerva("hay") pier = m.pier(subnet_id = "subnet-05eb26d8649a093e1", # project-subnet-public1-us-east-1a sg_groups = ["sg-0f9e555954e863954", # ssh "sg-0b34a3f7398076545", # default "sg-04cd2626d91ac093c"], # dask (8786, 8787) key_pair = ("Ari-Brown-HAY", "~/.ssh/Ari-Brown-HAY.pem"), iam = "S3+SSM+CloudWatch+ECR") cluster = pier.cluster(scheduler, worker, num_workers=2) cluster.start() ########## USAGE ######################## try: client = Client(cluster.public_location) print(client) athena = m.athena("s3://haystac-pmo-athena/") query = athena.query("select * from trajectories.basline where agent < 100") df = query.distribute_results(client) with Timing("persisting"): dp = df.persist() with Timing("count()"): print(dp.count()) with Timing("mean latitude"): print(dp.latitude.mean().compute()) with Timing("mean longitude"): print(dp.longitude.mean().compute()) finally: ########## FIN ####################### cluster.terminate()