minerva/cluster/ec2_cluster.py

73 lines
2.3 KiB
Python

import sys
import dask
from dask_cloudprovider.aws import EC2Cluster
from minerva.timing import Timing
import dask.dataframe as dd
import dask.distributed
from dask.distributed import Client
import configparser
import os
import contextlib
def get_aws_credentials():
parser = configparser.RawConfigParser()
parser.read(os.path.expanduser('~/.aws/credentials'))
credentials = parser.items('default')
all_credentials = {key.upper(): value for key, value in credentials}
with contextlib.suppress(KeyError):
all_credentials["AWS_REGION"] = all_credentials.pop("REGION")
return all_credentials
env = {} #get_aws_credentials()
env['EXTRA_PIP_PACKAGES'] = 's3fs'
cluster = EC2Cluster(env_vars = env,
n_workers = 2,
instance_type = 'm5.large',
subnet_id = "subnet-05eb26d8649a093e1", # project-subnet-public1-us-east-1a
security_groups = ["sg-0f9e555954e863954", # ssh
"sg-0b34a3f7398076545", # default
"sg-04cd2626d91ac093c"], # dask (8786, 8787)
ami = "ami-0399a4f70ca684620",
key_name = "Ari-Brown-HAY",
security = False,
iam_instance_profile = {'Name': 'Minerva'})
client = Client(cluster)
manifest_files = ['s3://haystac-pmo-athena/results/0.1664769366230633/20231129_180352_00031_3yekz_90fc7ac6-1be8-42a3-8485-f5fda039a23b']
try:
with Timing("read parquet from athena"):
df = dd.read_parquet(manifest_files)
print("distributed")
with Timing("partitioning"):
divisions = list(range(0, 10001))
df = df.set_index('agent', divisions=divisions)
with Timing("persisting"):
dp = df.persist()
with Timing("memory usage"):
print(dp.get_partition(400).memory_usage())
with Timing("count()"):
print(dp.count().compute())
with Timing("memory usage"):
print(dp.get_partition(400).memory_usage())
with Timing("mean latitude"):
print(dp.groupby(dp.index).latitude.mean().compute())
with Timing("count()"):
print(dp.count().compute())
finally:
########## FIN #######################
print("closing client")
client.close()
cluster.close()