diff --git a/dask_test.py b/dask_test.py new file mode 100644 index 0000000..a38abc6 --- /dev/null +++ b/dask_test.py @@ -0,0 +1,65 @@ +import sys +import minerva +from minerva.timing import Timing +import dask +from dask.distributed import Client + +m = minerva.Minerva("hay") + +print(f"connecting to {sys.argv[1]}") +client = Client(sys.argv[1]) + +try: + # Practice with a big array + # https://matthewrocklin.com/blog/work/2017/01/12/dask-dataframes + + #import numpy as np + #import dask.array as da + import dask.dataframe as dd + import time + + # https://stackoverflow.com/questions/43796774/loading-local-file-from-client-onto-dask-distributed-cluster + + athena = m.athena("s3://haystac-pmo-athena/") + + # Iteratively load files and scatter them to the cluster + with Timing("athena query"): + query = athena.execute("select * from trajectories.kitware limit 1000000", format='csv') + query.finish() + df = dd.read_csv(query.info_cache['ResultConfiguration']['OutputLocation']) + + #query = athena.query("select * from trajectories.kitware limit 100000000") + #df = dd.read_parquet(query.manifest_files()) + #df = query.distribute_results(client, size=10000) + print("distributed") + + with Timing("partitioning"): + #df = df.categorize('agent') + #df['agent'] = df['agent'].cat.as_ordered() + #partitions = df.describe(include=['category']).compute().iloc[0][0] + #df = df.set_index('agent', npartitions=partitions) + + divisions = list(range(0, 10001)) + df = df.set_index('agent', divisions=divisions) + + with Timing("persisting"): + dp = df.persist() + + import IPython + IPython.embed() + + with Timing("count()"): + print(dp.count().compute()) + + with Timing("mean latitude"): + print(dp.map_partitions(lambda p: p.groupby(p.index).latitude.mean()).compute()) + + with Timing("mean longitude"): + print(dp.map_partitions(lambda p: p.groupby(p.index).longitude.mean()).compute()) + +finally: + ########## FIN ####################### + client.close() + print("end") + input() + diff --git a/examples/simple_instance.py b/examples/simple_instance.py new file mode 100644 index 0000000..826704f --- /dev/null +++ b/examples/simple_instance.py @@ -0,0 +1,24 @@ +from minerva.cluster import Cluster +from minerva.pier import Pier + +def worker(pier, n=0): + mach = pier.machine(ami = "ami-05a242924e713f80a", # dask on ubuntu 22.04 x86 + instance_type = "t3.medium", + username = "ubuntu", + name = f"dask-client-{n}", + variables = {"type": "worker", + "number": n}) + return mach + +pier = Pier("hay", + subnet_id = "subnet-05eb26d8649a093e1", # project-subnet-public1-us-east-1a + sg_groups = ["sg-0f9e555954e863954", # ssh + "sg-0b34a3f7398076545", # default + "sg-04cd2626d91ac093c"], # dask (8786, 8787) + iam = "S3+SSM+CloudWatch+ECR", + key_pair = ("Ari-Brown-HAY", "~/.ssh/Ari-Brown-HAY.pem")) + +mach = worker(pier) +mach.create() +mach.login() + diff --git a/minerva/athena.py b/minerva/athena.py index b2b25a6..b64b093 100644 --- a/minerva/athena.py +++ b/minerva/athena.py @@ -19,16 +19,16 @@ class Athena: # For when you want to receive the results of something - def query(self, sql, params=[]): - q = Query(self, sql, params) + def query(self, sql, params=[], format='parquet'): + q = Query(self, sql, params, format) q.run() return q # For when you want to send a query to run, but there aren't results # (like a DML query for creating databases and tables etc) - def execute(self, sql, params=[]): - e = Execute(self, sql, params) + def execute(self, sql, params=[], format=None): + e = Execute(self, sql, params, format) e.run() return e @@ -38,7 +38,7 @@ class Execute: Execute will not return results, but will execute the SQL and return the final state. Execute is meant to be used for DML statements such as CREATE DATABASE/TABLE """ - def __init__(self, athena, sql, params=[]): + def __init__(self, athena, sql, params=[], format='parquet'): self.athena = athena self.handler = athena.handler self.client = athena.client @@ -47,6 +47,8 @@ class Execute: self.info_cache = None self.temps = [] self.ds = None + self.files = None + self.format = format # The string of the query @@ -56,10 +58,7 @@ class Execute: # Send the SQL to Athena for running def run(self): - if self.__class__ == Query: - config = {"OutputLocation": os.path.join(self.athena.output, "results")} - else: - config = {"OutputLocation": self.athena.output} + config = {"OutputLocation": os.path.join(self.athena.output, "results")} if self.params: resp = self.client.start_query_execution(QueryString=self.query(), @@ -102,7 +101,6 @@ class Execute: class Query(Execute): - DATA_STYLE = 'parquet' # Automatically includes unloading the results to Parquet format def query(self): @@ -110,7 +108,7 @@ class Query(Execute): "results", str(random.random())) query = f"unload ({self.sql}) to {repr(out)} " + \ - f"with (format = '{self.DATA_STYLE}')" + f"with (format = '{self.format}')" return query @@ -118,19 +116,19 @@ class Query(Execute): # the statement) # Blocks until the query has finished (because it calls `self.finish()`) def manifest_files(self): + if self.files: + return self.files + status = self.finish() if status == "SUCCEEDED": - # Track the runtime - ms = self.info_cache['Statistics']['TotalExecutionTimeInMillis'] - self.runtime = datetime.timedelta(seconds=ms / 1000) - # Because we're using `UNLOAD`, we get a manifest of the files # that make up our data. manif = self.info_cache['Statistics']['DataManifestLocation'] files = self.handler.s3.read(manif).split("\n") files = [f.strip() for f in files if f.strip()] # filter empty + self.files = files return files else: print("Error") @@ -152,7 +150,7 @@ class Query(Execute): return self.ds - def distribute_results(self, client): + def distribute_results(self, client, size=10000): import dask.dataframe as dd import pandas as pd @@ -163,7 +161,8 @@ class Query(Execute): print(f"{len(self.manifest_files())} files in manifest") for fn in self.manifest_files(): print(f"reading {fn}...") - df = pd.read_parquet(fn) + df = dd.from_pandas(pd.read_parquet(fn), chunksize=100000) + print(df._meta) print("\tloaded") future = client.scatter(df) print("\tscattered") diff --git a/minerva/machine.py b/minerva/machine.py index 13391fd..1044ce4 100644 --- a/minerva/machine.py +++ b/minerva/machine.py @@ -93,7 +93,7 @@ class Machine: # Final wait, now that the server is up and running -- need # some time for daemons to start - time.sleep(25) + time.sleep(35) self.ready = True diff --git a/pyproject.toml b/pyproject.toml index 458f1f9..6db34e9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -21,3 +21,4 @@ boto3 = "^1.26.161" pyarrow = "^12.0.1" joblib = "^1.1.0" fabric = "^3.0.0" +s3fs = "2023.9.2" diff --git a/run_cluster.py b/run_cluster.py new file mode 100644 index 0000000..1350e8c --- /dev/null +++ b/run_cluster.py @@ -0,0 +1,42 @@ +import minerva +from minerva.pier import Pier + +########### PREP ############################ + +def worker(pier, n): + mach = pier.machine(ami = "ami-0a3df8365ed416816", # dask on ubuntu 22.04 x86 + instance_type = "t3.medium", + username = "ubuntu", + name = f"dask-worker-{n}", + variables = {"type": "worker", + "number": n}, + disk_size = 512) + return mach + +def scheduler(pier): + mach = pier.machine(ami = "ami-0a3df8365ed416816", # dask on ubuntu 22.04 x86 + instance_type = "t3.medium", + username = "ubuntu", + name = f"dask-scheduler", + variables = {"type": "scheduler"}, + disk_size = 32) + return mach + +########## CLUSTER ########################## + +m = minerva.Minerva("hay") +pier = m.pier(subnet_id = "subnet-05eb26d8649a093e1", # project-subnet-public1-us-east-1a + sg_groups = ["sg-0f9e555954e863954", # ssh + "sg-0b34a3f7398076545", # default + "sg-04cd2626d91ac093c"], # dask (8786, 8787) + key_pair = ("Ari-Brown-HAY", "~/.ssh/Ari-Brown-HAY.pem"), + iam = "Minerva") + +cluster = pier.cluster(scheduler, worker, num_workers=5) +cluster.start() +print(cluster.public_location) + +print("press enter to terminate cluster") +input() +cluster.terminate() +