diff --git a/minerva/__init__.py b/minerva/__init__.py index d9b4770..4c0d10b 100644 --- a/minerva/__init__.py +++ b/minerva/__init__.py @@ -1,8 +1,8 @@ -from .minerva import Athena, Execute, Query, Redshift, parallel_map +from .parallel import parallel_map +from .athena import Athena +from .redshift import Redshift __all__ = [ - "Execute", - "Query", "Athena", "Redshift", "parallel_map" diff --git a/minerva/athena.py b/minerva/athena.py index 9c423d2..85b2332 100644 --- a/minerva/athena.py +++ b/minerva/athena.py @@ -49,12 +49,12 @@ class Execute: return self.sql def run(self): - resp = self.athena.start_query_execution(QueryString=self.sql) + resp = self.athena.start_query_execution(QueryString=self.query()) self.query_id = resp['QueryExecutionId'] return resp def status(self): - return self.info()['Status'] + return self.info()['Status']['State'] def info(self): res = self.athena.get_query_execution(QueryExecutionId=self.query_id) @@ -62,8 +62,10 @@ class Execute: return self.info_cache def finish(self): - while stat := self.status() in ['QUEUED', 'RUNNING']: + stat = self.status() + while stat in ['QUEUED', 'RUNNING']: time.sleep(5) + stat = self.status() return stat # finalized state diff --git a/minerva/parallel.py b/minerva/parallel.py index fd44c7f..bf268b1 100644 --- a/minerva/parallel.py +++ b/minerva/parallel.py @@ -10,7 +10,8 @@ from joblib import Parallel, delayed # # parallel_map(say, [str(i) for i in range(10)], cores=4) def parallel_map(func=None, data=None, cores=8): - size = len(data) // cores + cores = min(cores, len(data)) + size = len(data) // cores groups = [data[i:i + size] for i in range(0, len(data), size)] def wrapper_func(fs): diff --git a/minerva/redshift.py b/minerva/redshift.py index f7cf971..92cacb2 100644 --- a/minerva/redshift.py +++ b/minerva/redshift.py @@ -6,6 +6,7 @@ import pyarrow as pa import pyarrow.dataset import pprint import datetime +import json from minerva import parallel_map pp = pprint.PrettyPrinter(indent=4) @@ -66,8 +67,10 @@ class Execute: return self.info_cache def finish(self): - while stat := self.status() in ['SUBMITTED', 'PICKED', 'STARTED']: + stat = self.status() + while stat in ['SUBMITTED', 'PICKED', 'STARTED']: time.sleep(5) + stat = self.status() return stat # finalized state @@ -87,9 +90,9 @@ class Query(Execute): def manifest_files(self): status = self.finish() - if status == "SUCCEEDED": + if status == "FINISHED": # Track the runtime - self.runtime = tiedot['UpdatedAt'] - tiedot['CreatedAt'] + self.runtime = self.info_cache['UpdatedAt'] - self.info_cache['CreatedAt'] # Because we're using `UNLOAD`, we get a manifest of the files # that make up our data. diff --git a/test2.py b/test2.py index 04b5144..eea8b39 100644 --- a/test2.py +++ b/test2.py @@ -7,7 +7,6 @@ red = m.Redshift("hay", "s3://haystac-pmo-athena/", db="dev", cluster="redshift-cluster-1") query = red.query("select count(*) from myspectrum_schema.kitware") -print(query) data = query.results() pp.pprint(data.head(10)) print(query.runtime)