From 5eb84710818fa32508933cbb48c64c7da3f0d9a4 Mon Sep 17 00:00:00 2001 From: Ari Brown Date: Thu, 21 Dec 2023 14:23:58 -0500 Subject: [PATCH] better redshift support, working on dask stuff --- cluster/ec2_cluster.py | 33 +---------- cluster/run_cluster.py | 12 +++- dask_test.py | 124 ----------------------------------------- minerva/machine.py | 2 +- minerva/redshift.py | 90 ++++++++++++++++++++++++------ 5 files changed, 86 insertions(+), 175 deletions(-) delete mode 100644 dask_test.py diff --git a/cluster/ec2_cluster.py b/cluster/ec2_cluster.py index fe45574..1fffa0b 100644 --- a/cluster/ec2_cluster.py +++ b/cluster/ec2_cluster.py @@ -18,7 +18,7 @@ def get_aws_credentials(): all_credentials["AWS_REGION"] = all_credentials.pop("REGION") return all_credentials -env = get_aws_credentials() +env = {} #get_aws_credentials() env['EXTRA_PIP_PACKAGES'] = 's3fs' cluster = EC2Cluster(env_vars = env, @@ -35,36 +35,7 @@ cluster = EC2Cluster(env_vars = env, client = Client(cluster) -manifest_files = ['s3://haystac-pmo-athena/results/0.1664769366230633/20231129_180352_00031_3yekz_90fc7ac6-1be8-42a3-8485-f5fda039a23b', - 's3://haystac-pmo-athena/results/0.1664769366230633/20231129_180352_00031_3yekz_3fd296d8-4d16-4491-a950-d5e4b0fff172', - 's3://haystac-pmo-athena/results/0.1664769366230633/20231129_180352_00031_3yekz_19afa54e-ed8f-4da5-a575-a2f7acd02399', - 's3://haystac-pmo-athena/results/0.1664769366230633/20231129_180352_00031_3yekz_0240d219-11d8-4965-9425-ed2cbe9f0984', - 's3://haystac-pmo-athena/results/0.1664769366230633/20231129_180352_00031_3yekz_657a9eee-2db8-4b55-8c2b-c98c67eb2992', - 's3://haystac-pmo-athena/results/0.1664769366230633/20231129_180352_00031_3yekz_4fb5f559-ac3e-4d0b-bff6-e90aefcecff7', - 's3://haystac-pmo-athena/results/0.1664769366230633/20231129_180352_00031_3yekz_a7b3bdfa-aa06-4c7b-b010-f0c59d925051', - 's3://haystac-pmo-athena/results/0.1664769366230633/20231129_180352_00031_3yekz_444110c5-885d-4369-9cbd-bb842383baa7', - 's3://haystac-pmo-athena/results/0.1664769366230633/20231129_180352_00031_3yekz_921dc55e-8d94-4f91-b71b-ec1b45ff999f', - 's3://haystac-pmo-athena/results/0.1664769366230633/20231129_180352_00031_3yekz_dc3fafb8-8d8a-432c-a9f4-386332b7720c', - 's3://haystac-pmo-athena/results/0.1664769366230633/20231129_180352_00031_3yekz_9b8c79b9-d8a6-487e-a10b-6d65dae9daff', - 's3://haystac-pmo-athena/results/0.1664769366230633/20231129_180352_00031_3yekz_26fe7c8b-15f4-419f-a7c7-c87461f1b69c', - 's3://haystac-pmo-athena/results/0.1664769366230633/20231129_180352_00031_3yekz_45c5c82b-befc-4b0c-97fa-673de0feb9dd', - 's3://haystac-pmo-athena/results/0.1664769366230633/20231129_180352_00031_3yekz_aca904de-b154-4f56-b255-b71de0be3060', - 's3://haystac-pmo-athena/results/0.1664769366230633/20231129_180352_00031_3yekz_9a131e04-2353-44f1-9c13-7c1b381ca553', - 's3://haystac-pmo-athena/results/0.1664769366230633/20231129_180352_00031_3yekz_66e9e85d-406b-4164-87f3-3ffbe4ff9162', - 's3://haystac-pmo-athena/results/0.1664769366230633/20231129_180352_00031_3yekz_e32f16e7-591e-4da4-a2b1-4b00ac4cb617', - 's3://haystac-pmo-athena/results/0.1664769366230633/20231129_180352_00031_3yekz_88eb15ea-9278-4e54-978b-9c211a4b834f', - 's3://haystac-pmo-athena/results/0.1664769366230633/20231129_180352_00031_3yekz_71b697fe-3da2-4c5b-a046-e5773b494e7b', - 's3://haystac-pmo-athena/results/0.1664769366230633/20231129_180352_00031_3yekz_0454e926-10fb-4af4-82ea-082e6bdb7c5c', - 's3://haystac-pmo-athena/results/0.1664769366230633/20231129_180352_00031_3yekz_6ee2d3b4-a837-419f-b181-53a127a791e3', - 's3://haystac-pmo-athena/results/0.1664769366230633/20231129_180352_00031_3yekz_abfdb890-64d6-4e05-adf6-c020633fb1ab', - 's3://haystac-pmo-athena/results/0.1664769366230633/20231129_180352_00031_3yekz_ee27888e-c3fa-4731-a75b-b2f20efcafc3', - 's3://haystac-pmo-athena/results/0.1664769366230633/20231129_180352_00031_3yekz_c0d5978a-a66a-4faf-9d5b-5f5ebd7e7311', - 's3://haystac-pmo-athena/results/0.1664769366230633/20231129_180352_00031_3yekz_bdf24af9-bdca-467c-abca-04e215eb190c', - 's3://haystac-pmo-athena/results/0.1664769366230633/20231129_180352_00031_3yekz_0dddbf5f-ce5d-4685-8361-cda906bef37c', - 's3://haystac-pmo-athena/results/0.1664769366230633/20231129_180352_00031_3yekz_a6800c32-4790-4e77-bfdf-7900ed44097a', - 's3://haystac-pmo-athena/results/0.1664769366230633/20231129_180352_00031_3yekz_0d0913a9-9f82-4418-9450-4cbf364ca9fb', - 's3://haystac-pmo-athena/results/0.1664769366230633/20231129_180352_00031_3yekz_ab1a2407-b7a4-4d69-9539-a05391945149', - 's3://haystac-pmo-athena/results/0.1664769366230633/20231129_180352_00031_3yekz_bcfde315-905f-475c-a3c3-3a3556e53fe4'] +manifest_files = ['s3://haystac-pmo-athena/results/0.1664769366230633/20231129_180352_00031_3yekz_90fc7ac6-1be8-42a3-8485-f5fda039a23b'] try: with Timing("read parquet from athena"): diff --git a/cluster/run_cluster.py b/cluster/run_cluster.py index 1a00115..cbe4461 100644 --- a/cluster/run_cluster.py +++ b/cluster/run_cluster.py @@ -5,10 +5,16 @@ from minerva.pier import Pier ########### PREP ############################ DASK_BASE = "ami-0399a4f70ca684620" # dask on ubuntu 22.04 x86 +NUM_WORK = int(sys.argv[1]) + +if sys.argv[2] == "large": + WORKER_TYPE = "r5.xlarge" +else: + WORKER_TYPE = "m5.large" def worker(pier, n): mach = pier.machine(ami = DASK_BASE, - instance_type = "m5.large", + instance_type = WORKER_TYPE, username = "ubuntu", name = f"dask-worker-{n}", variables = {"type": "worker", @@ -18,7 +24,7 @@ def worker(pier, n): def scheduler(pier): mach = pier.machine(ami = DASK_BASE, - instance_type = "m5.large", # "r5.xlarge", + instance_type = "m5.large", username = "ubuntu", name = f"dask-scheduler", variables = {"type": "scheduler"}, @@ -35,7 +41,7 @@ pier = m.pier(subnet_id = "subnet-05eb26d8649a093e1", # project-subnet-public key_pair = ("Ari-Brown-HAY", "~/.ssh/Ari-Brown-HAY.pem"), iam = "Minerva") -cluster = pier.cluster(scheduler, worker, num_workers=int(sys.argv[1])) +cluster = pier.cluster(scheduler, worker, num_workers=NUM_WORK) cluster.start() print() diff --git a/dask_test.py b/dask_test.py deleted file mode 100644 index c46eb11..0000000 --- a/dask_test.py +++ /dev/null @@ -1,124 +0,0 @@ -import sys -import minerva -from minerva.timing import Timing -import dask -import dask.dataframe as dd -import time - -#dask.config.set({'distributed.worker.multiprocessing-method': 'fork'}) - -import dask.distributed -from dask.distributed import Client - -m = minerva.Minerva("hay") - -print(f"connecting to {sys.argv[1]}") -client = Client(sys.argv[1]) - -#manifest_files = ['s3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_b311740c-02a5-42a5-9996-b17bdbf604ca', -# 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_1abe3b5f-ab17-401e-91e9-a5c4f3fa17b9', -# 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_c08442a6-74b8-4cf0-afc5-ab1a03f59eea', -# 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_399a2786-68b5-4c40-961e-476451885f7f', -# 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_8a699085-7d6b-426a-8a59-1ae4a6a183b9', -# 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_0ca91f4c-85cf-4c93-b039-4f307dde0a86', -# 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_bf38065d-fde8-45d3-87eb-3a56479f8b57', -# 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_6a0ebea5-9a23-4fb5-8547-d4bb82f6a821', -# 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_3464c3f6-fa5c-4d6b-bdf4-be387ff9dfda', -# 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_9088425e-0c87-49ef-a883-92c217a715bb', -# 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_0517e42b-089b-4819-b166-fa21489485a6', -# 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_0197ff95-8c3c-4121-8077-a499927bb097', -# 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_990579d4-0bae-4547-a630-e5a05341d264', -# 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_2c838980-65d6-4a3d-898d-bfcb63e361bd', -# 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_930df083-3c4c-45d6-963e-51e47d3a704c', -# 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_6739cc33-43e0-468d-a6d2-fa17161bbd5e', -# 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_685d81de-ffff-4629-bfb2-4bca3941474b', -# 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_41e3cb39-a917-460a-bd86-413df774806b', -# 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_ce533449-b14f-439e-92dc-69abdeda706f', -# 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_c68ff721-8b97-4abf-8573-ac3fd617af97', -# 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_04f13894-63d8-453c-ace7-87368511e94b', -# 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_0d906712-e469-419a-8bf7-508b0b79848d', -# 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_c514089d-c2eb-496a-a1a2-8501ecaf7d84', -# 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_fe04a9d7-56ad-4e99-bbfa-8adb398af971', -# 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_1ea10655-a063-468e-ad9e-0788eb15bd8d', -# 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_7146bbe6-1278-43d3-a9e3-1a9e6a776105', -# 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_add3ea7e-961f-4760-88b3-c839d4a344b3', -# 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_2a00d610-18bf-4611-8f58-f47bdd484504', -# 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_54ea410d-fa42-4ab5-bc2b-8179ad1cfb46', -# 's3://haystac-pmo-athena/results/0.32812059783186387/20231130_212053_00057_gqvh5_07aded26-d650-4852-9d3f-732409db3fd7'] - -manifest_files = ["s3://haystac-pmo-athena/results/f236b9a3-8aa7-4e29-b4c7-1fee5981a0ad.csv"] - -def full_s3(obj): - return "s3://" + obj.bucket_name + "/" + obj.key - -try: - athena = m.athena("s3://haystac-pmo-athena/") - records = 1_000_000_000 - - # Iteratively load files and scatter them to the cluster - with Timing("read parquet from athena"): - # Delta Table - #files = m.s3.ls("s3://haystac-archive-phase1.trial1/ta1.kitware/ta1/simulation/train/") - #parqs = [full_s3(o) for o in files if o.key.endswith('.parquet')] - - # 20 partitions at a time, concat 20 at a time - #df = dd.read_parquet(parqs[0:20])#.astype({"agent": "category", - # # "timestamp": "datetime64[ns, UTC]"}) - #for i in range(20, len(parqs), 20): - # new = dd.read_parquet(parqs[i:i + 20])#.astype({"agent": "category", - # # "timestamp": "datetime64[ns, UTC]"}) - # df = df.concat(new) - - # 20 partitions at a time, concat all at once - #df = dd.concat([dd.read_parquet(parqs[i:i + 20], engine='fastparquet') for i in range(0, len(parqs), 20)]) - - # All partitions at once - #df = dd.read_parquet(parqs, engine='fastparquet') - - # Single CSV file - #query = athena.execute(f"select * from trajectories.kitware limit {records}", format="csv") - #query.finish() - #df = dd.read_csv(query.info_cache['ResultConfiguration']['OutputLocation']) - df = dd.read_csv(manifest_files[0]) - - # Unload to Parquet files - #query = athena.query(f"select * from trajectories.kitware limit {records}") - #df = dd.read_parquet(query.manifest_files()) - #df = dd.read_parquet(manifest_files, engine='fastparquet') - - with Timing("partitioning"): - divisions = list(range(0, 10001)) - df = df.set_index('agent', divisions=divisions) - - with Timing("persisting"): - dp = df.persist() - print(dp) - - #import IPython - #IPython.embed() - - with Timing("memory usage"): - print(dp.get_partition(400).memory_usage()) - - with Timing("count()"): - print(dp.count()) - - with Timing("memory usage"): - print(dp.get_partition(400).memory_usage()) - - with Timing("mean latitude"): - print(dp.groupby(dp.index).latitude.mean()) - - with Timing("count()"): - print(dp.count()) - - #with Timing("mean longitude"): - # print(dp.groupby(dp.index).longitude.mean().compute()) - -finally: - ########## FIN ####################### - print("closing client") - client.close() - #print("end") - #input() - diff --git a/minerva/machine.py b/minerva/machine.py index 1044ce4..7e84aea 100644 --- a/minerva/machine.py +++ b/minerva/machine.py @@ -113,7 +113,7 @@ class Machine: self.description = resp['Reservations'][0]['Instances'][0] self.public_ip = self.description['PublicIpAddress'] - print(f"\t{self.name} ({self.info['InstanceId']}) => {self.public_ip} ({self.private_ip})") + print(f"\t{self.name} ({self.info['InstanceId']}\t- {self.instance_type}) => {self.public_ip} ({self.private_ip})") ip = self.public_ip or self.private_ip self.ssh = Connection(ip, diff --git a/minerva/redshift.py b/minerva/redshift.py index 0258b9f..3724511 100644 --- a/minerva/redshift.py +++ b/minerva/redshift.py @@ -12,12 +12,19 @@ from minerva import parallel_map pp = pprint.PrettyPrinter(indent=4) class Redshift: - def __init__(self, handler, output, db=None, cluster=None): - self.handler = handler - self.client = handler.session.client("redshift-data") - self.output = output - self.database = db - self.cluster = cluster + def __init__(self, handler, output, db=None, cluster=None, workgroup=None, secret=None): + self.handler = handler + self.client = handler.session.client("redshift-data") + self.output = output + self.database = db + self.cluster = cluster + self.workgroup = workgroup + self.secret = secret + + if workgroup: + cli = handler.session.client("redshift-serverless") + wg = cli.get_workgroup(workgroupName=workgroup) + self.rpus = wg['workgroup']['baseCapacity'] def query(self, sql): q = Query(self, sql) @@ -41,31 +48,52 @@ class Execute: self.client = redshift.client self.sql = sql self.info_cache = None + self.ds = None + self.files = None + self.temps = [] def query(self): return self.sql + def run(self): - resp = self.client.execute_statement(Sql=self.query(), - Database=self.redshift.database, - ClusterIdentifier=self.redshift.cluster) + if self.redshift.cluster: + resp = self.client.execute_statement(Sql=self.query(), + Database=self.redshift.database, + ClusterIdentifier=self.redshift.cluster) + else: + resp = self.client.execute_statement(Sql=self.query(), + Database=self.redshift.database, + SecretArn=self.redshift.secret, + WorkgroupName=self.redshift.workgroup) + self.query_id = resp['Id'] return resp + def status(self): return self.info()['Status'] + def info(self): res = self.client.describe_statement(Id=self.query_id) self.info_cache = res return self.info_cache + + # Block until the SQL has finished running def finish(self): stat = self.status() while stat in ['SUBMITTED', 'PICKED', 'STARTED']: time.sleep(5) stat = self.status() + pp.pprint(self.info_cache) + self.runtime = self.info_cache['UpdatedAt'] - self.info_cache['CreatedAt'] + + if self.redshift.rpus: + self.cost = 0.36 * self.redshift.rpus * self.runtime.seconds / 3600.0 # $0.36 / RPU-hour + return stat # finalized state @@ -75,12 +103,22 @@ class Query(Execute): def query(self): self.out = os.path.join(self.redshift.output, str(random.random())) - query = f"unload ({repr(self.sql)}) to {repr(self.out)} " + \ - f"iam_role default " + \ - f"format as {self.DATA_STYLE} " + \ - f"manifest" + #query = f"unload ({repr(self.sql)}) to {repr(self.out)} " + \ + # f"iam_role default " + \ + # f"format as {self.DATA_STYLE} " + \ + # f"manifest" + + query = f""" +create temp table temp_data as {self.sql}; +unload ('select * from temp_data') to {repr(self.out)} +iam_role default +format as {self.DATA_STYLE} +manifest; +drop table temp_data; +""" return query + def manifest_files(self): status = self.finish() @@ -102,9 +140,29 @@ class Query(Execute): else: return status # canceled or error + def results(self): - #local = [self.handler.s3.download(f) for f in self.manifest_files()] - local = parallel_map(self.handler.s3.download, self.manifest_files()) - self.ds = pa.dataset.dataset(local) + self.temps = [self.handler.s3.download(f) for f in self.manifest_files()] + #local = parallel_map(self.handler.s3.download, self.manifest_files()) + self.ds = pa.dataset.dataset(self.temps) return self.ds + + # Return scalar results + # Abstracts away a bunch of keystrokes + def scalar(self): + return self.results().head(1)[0][0].as_py() + + + def __enter__(self): + return self + + + def __exit__(self, exception_type, exception_value, exception_traceback): + self.close() + + + def close(self): + if self.temps: + for file in self.temps: + os.remove(file)