fixed a bug that occurred from refactoring

This commit is contained in:
Ari Brown 2024-02-23 10:55:53 -05:00
parent 64b2cd6c00
commit 98524d3be6
2 changed files with 13 additions and 9 deletions

View file

@ -24,7 +24,7 @@ class Redshift:
if workgroup: if workgroup:
cli = handler.session.client("redshift-serverless") cli = handler.session.client("redshift-serverless")
wg = cli.get_workgroup(workgroupName=workgroup) wg = cli.get_workgroup(workgroupName=workgroup)
self.rpus = wg['workgroup']['baseCapacity'] self.rpus = wg['workgroup']['maxCapacity'] # provide an upper bound
def query(self, sql): def query(self, sql):
q = Query(self, sql) q = Query(self, sql)
@ -107,11 +107,13 @@ class Execute:
# Block until the SQL has finished running # Block until the SQL has finished running
def finish(self): def finish(self):
stat = self.status() stat = self.status()
while stat in ['SUBMITTED', 'PICKED', 'STARTED']: while stat in ['SUBMITTED', 'PICKED', 'STARTED']:
time.sleep(5) time.sleep(5)
stat = self.status() stat = self.status()
self.update_values() self.update_values()
return self.status_cache
def update_values(self): def update_values(self):
self.status_cache = self.info_cache['Status'] self.status_cache = self.info_cache['Status']
@ -122,7 +124,6 @@ class Execute:
# $0.36 / RPU-hour # $0.36 / RPU-hour
self.cost = 0.36 * self.redshift.rpus * self.runtime.seconds / 3600.0 self.cost = 0.36 * self.redshift.rpus * self.runtime.seconds / 3600.0
return self.status_cache
class Query(Execute): class Query(Execute):
@ -142,10 +143,10 @@ create temp table temp_data as {self.sql};
unload ('select * from temp_data') to {repr(self.out)} unload ('select * from temp_data') to {repr(self.out)}
iam_role default iam_role default
format as {self.DATA_STYLE} format as {self.DATA_STYLE}
parallel off
manifest; manifest;
drop table temp_data; drop table temp_data;
""" """
print(query)
return query return query
@ -156,10 +157,13 @@ drop table temp_data;
status = self.finish() status = self.finish()
if status == "FINISHED": if status == "FINISHED":
if self.info_cache['ResultRows'] != 0: # Because we're using `UNLOAD`, we get a manifest of the files
# Because we're using `UNLOAD`, we get a manifest of the files # that make up our data.
# that make up our data. manif = self.out + "manifest"
manif = self.out + "manifest"
# do we even have results? redshift doesn't create manifests when
# there are 0 results
if list(self.handler.s3.ls(manif)):
tmp = self.handler.s3.download(manif) tmp = self.handler.s3.download(manif)
with open(tmp, "r") as f: with open(tmp, "r") as f:
js = json.load(f) js = json.load(f)
@ -178,7 +182,7 @@ drop table temp_data;
# if it's not a list, then we've failed # if it's not a list, then we've failed
if type(self.manifest_files()) != type([]): if type(self.manifest_files()) != type([]):
raise Exception(f"""Query has status {self.status()} did not complete and raise Exception(f"""Query has status {self.status()} did not complete and
thus has no results""") thus has no results. Error is {self.info_cache.get('Error', '')}""")
self.temps = [self.handler.s3.download(f) for f in self.manifest_files()] self.temps = [self.handler.s3.download(f) for f in self.manifest_files()]
#local = parallel_map(self.handler.s3.download, self.manifest_files()) #local = parallel_map(self.handler.s3.download, self.manifest_files())

View file

@ -1,6 +1,6 @@
[tool.poetry] [tool.poetry]
name = "minerva" name = "minerva"
version = "0.7.3" version = "0.7.4"
description = "Easier access to AWS Athena and Redshift" description = "Easier access to AWS Athena and Redshift"
authors = [ authors = [
"Ari Brown <ari@airintech.com>", "Ari Brown <ari@airintech.com>",