adding dask examples

This commit is contained in:
Ari Brown 2023-11-16 10:33:56 -05:00
parent c0ff6af866
commit fe06b6b808
6 changed files with 149 additions and 18 deletions

65
dask_test.py Normal file
View file

@ -0,0 +1,65 @@
import sys
import minerva
from minerva.timing import Timing
import dask
from dask.distributed import Client
m = minerva.Minerva("hay")
print(f"connecting to {sys.argv[1]}")
client = Client(sys.argv[1])
try:
# Practice with a big array
# https://matthewrocklin.com/blog/work/2017/01/12/dask-dataframes
#import numpy as np
#import dask.array as da
import dask.dataframe as dd
import time
# https://stackoverflow.com/questions/43796774/loading-local-file-from-client-onto-dask-distributed-cluster
athena = m.athena("s3://haystac-pmo-athena/")
# Iteratively load files and scatter them to the cluster
with Timing("athena query"):
query = athena.execute("select * from trajectories.kitware limit 1000000", format='csv')
query.finish()
df = dd.read_csv(query.info_cache['ResultConfiguration']['OutputLocation'])
#query = athena.query("select * from trajectories.kitware limit 100000000")
#df = dd.read_parquet(query.manifest_files())
#df = query.distribute_results(client, size=10000)
print("distributed")
with Timing("partitioning"):
#df = df.categorize('agent')
#df['agent'] = df['agent'].cat.as_ordered()
#partitions = df.describe(include=['category']).compute().iloc[0][0]
#df = df.set_index('agent', npartitions=partitions)
divisions = list(range(0, 10001))
df = df.set_index('agent', divisions=divisions)
with Timing("persisting"):
dp = df.persist()
import IPython
IPython.embed()
with Timing("count()"):
print(dp.count().compute())
with Timing("mean latitude"):
print(dp.map_partitions(lambda p: p.groupby(p.index).latitude.mean()).compute())
with Timing("mean longitude"):
print(dp.map_partitions(lambda p: p.groupby(p.index).longitude.mean()).compute())
finally:
########## FIN #######################
client.close()
print("end")
input()

View file

@ -0,0 +1,24 @@
from minerva.cluster import Cluster
from minerva.pier import Pier
def worker(pier, n=0):
mach = pier.machine(ami = "ami-05a242924e713f80a", # dask on ubuntu 22.04 x86
instance_type = "t3.medium",
username = "ubuntu",
name = f"dask-client-{n}",
variables = {"type": "worker",
"number": n})
return mach
pier = Pier("hay",
subnet_id = "subnet-05eb26d8649a093e1", # project-subnet-public1-us-east-1a
sg_groups = ["sg-0f9e555954e863954", # ssh
"sg-0b34a3f7398076545", # default
"sg-04cd2626d91ac093c"], # dask (8786, 8787)
iam = "S3+SSM+CloudWatch+ECR",
key_pair = ("Ari-Brown-HAY", "~/.ssh/Ari-Brown-HAY.pem"))
mach = worker(pier)
mach.create()
mach.login()

View file

@ -19,16 +19,16 @@ class Athena:
# For when you want to receive the results of something # For when you want to receive the results of something
def query(self, sql, params=[]): def query(self, sql, params=[], format='parquet'):
q = Query(self, sql, params) q = Query(self, sql, params, format)
q.run() q.run()
return q return q
# For when you want to send a query to run, but there aren't results # For when you want to send a query to run, but there aren't results
# (like a DML query for creating databases and tables etc) # (like a DML query for creating databases and tables etc)
def execute(self, sql, params=[]): def execute(self, sql, params=[], format=None):
e = Execute(self, sql, params) e = Execute(self, sql, params, format)
e.run() e.run()
return e return e
@ -38,7 +38,7 @@ class Execute:
Execute will not return results, but will execute the SQL and return the final state. Execute will not return results, but will execute the SQL and return the final state.
Execute is meant to be used for DML statements such as CREATE DATABASE/TABLE Execute is meant to be used for DML statements such as CREATE DATABASE/TABLE
""" """
def __init__(self, athena, sql, params=[]): def __init__(self, athena, sql, params=[], format='parquet'):
self.athena = athena self.athena = athena
self.handler = athena.handler self.handler = athena.handler
self.client = athena.client self.client = athena.client
@ -47,6 +47,8 @@ class Execute:
self.info_cache = None self.info_cache = None
self.temps = [] self.temps = []
self.ds = None self.ds = None
self.files = None
self.format = format
# The string of the query # The string of the query
@ -56,10 +58,7 @@ class Execute:
# Send the SQL to Athena for running # Send the SQL to Athena for running
def run(self): def run(self):
if self.__class__ == Query: config = {"OutputLocation": os.path.join(self.athena.output, "results")}
config = {"OutputLocation": os.path.join(self.athena.output, "results")}
else:
config = {"OutputLocation": self.athena.output}
if self.params: if self.params:
resp = self.client.start_query_execution(QueryString=self.query(), resp = self.client.start_query_execution(QueryString=self.query(),
@ -102,7 +101,6 @@ class Execute:
class Query(Execute): class Query(Execute):
DATA_STYLE = 'parquet'
# Automatically includes unloading the results to Parquet format # Automatically includes unloading the results to Parquet format
def query(self): def query(self):
@ -110,7 +108,7 @@ class Query(Execute):
"results", "results",
str(random.random())) str(random.random()))
query = f"unload ({self.sql}) to {repr(out)} " + \ query = f"unload ({self.sql}) to {repr(out)} " + \
f"with (format = '{self.DATA_STYLE}')" f"with (format = '{self.format}')"
return query return query
@ -118,19 +116,19 @@ class Query(Execute):
# the statement) # the statement)
# Blocks until the query has finished (because it calls `self.finish()`) # Blocks until the query has finished (because it calls `self.finish()`)
def manifest_files(self): def manifest_files(self):
if self.files:
return self.files
status = self.finish() status = self.finish()
if status == "SUCCEEDED": if status == "SUCCEEDED":
# Track the runtime
ms = self.info_cache['Statistics']['TotalExecutionTimeInMillis']
self.runtime = datetime.timedelta(seconds=ms / 1000)
# Because we're using `UNLOAD`, we get a manifest of the files # Because we're using `UNLOAD`, we get a manifest of the files
# that make up our data. # that make up our data.
manif = self.info_cache['Statistics']['DataManifestLocation'] manif = self.info_cache['Statistics']['DataManifestLocation']
files = self.handler.s3.read(manif).split("\n") files = self.handler.s3.read(manif).split("\n")
files = [f.strip() for f in files if f.strip()] # filter empty files = [f.strip() for f in files if f.strip()] # filter empty
self.files = files
return files return files
else: else:
print("Error") print("Error")
@ -152,7 +150,7 @@ class Query(Execute):
return self.ds return self.ds
def distribute_results(self, client): def distribute_results(self, client, size=10000):
import dask.dataframe as dd import dask.dataframe as dd
import pandas as pd import pandas as pd
@ -163,7 +161,8 @@ class Query(Execute):
print(f"{len(self.manifest_files())} files in manifest") print(f"{len(self.manifest_files())} files in manifest")
for fn in self.manifest_files(): for fn in self.manifest_files():
print(f"reading {fn}...") print(f"reading {fn}...")
df = pd.read_parquet(fn) df = dd.from_pandas(pd.read_parquet(fn), chunksize=100000)
print(df._meta)
print("\tloaded") print("\tloaded")
future = client.scatter(df) future = client.scatter(df)
print("\tscattered") print("\tscattered")

View file

@ -93,7 +93,7 @@ class Machine:
# Final wait, now that the server is up and running -- need # Final wait, now that the server is up and running -- need
# some time for daemons to start # some time for daemons to start
time.sleep(25) time.sleep(35)
self.ready = True self.ready = True

View file

@ -21,3 +21,4 @@ boto3 = "^1.26.161"
pyarrow = "^12.0.1" pyarrow = "^12.0.1"
joblib = "^1.1.0" joblib = "^1.1.0"
fabric = "^3.0.0" fabric = "^3.0.0"
s3fs = "2023.9.2"

42
run_cluster.py Normal file
View file

@ -0,0 +1,42 @@
import minerva
from minerva.pier import Pier
########### PREP ############################
def worker(pier, n):
mach = pier.machine(ami = "ami-0a3df8365ed416816", # dask on ubuntu 22.04 x86
instance_type = "t3.medium",
username = "ubuntu",
name = f"dask-worker-{n}",
variables = {"type": "worker",
"number": n},
disk_size = 512)
return mach
def scheduler(pier):
mach = pier.machine(ami = "ami-0a3df8365ed416816", # dask on ubuntu 22.04 x86
instance_type = "t3.medium",
username = "ubuntu",
name = f"dask-scheduler",
variables = {"type": "scheduler"},
disk_size = 32)
return mach
########## CLUSTER ##########################
m = minerva.Minerva("hay")
pier = m.pier(subnet_id = "subnet-05eb26d8649a093e1", # project-subnet-public1-us-east-1a
sg_groups = ["sg-0f9e555954e863954", # ssh
"sg-0b34a3f7398076545", # default
"sg-04cd2626d91ac093c"], # dask (8786, 8787)
key_pair = ("Ari-Brown-HAY", "~/.ssh/Ari-Brown-HAY.pem"),
iam = "Minerva")
cluster = pier.cluster(scheduler, worker, num_workers=5)
cluster.start()
print(cluster.public_location)
print("press enter to terminate cluster")
input()
cluster.terminate()