forked from bellwether/minerva
added parallel file downloading and dried up the code a touch
This commit is contained in:
parent
68bc346e24
commit
22746b6639
5 changed files with 16 additions and 11 deletions
|
|
@ -1,8 +1,8 @@
|
||||||
from .minerva import Athena, Execute, Query, Redshift, parallel_map
|
from .parallel import parallel_map
|
||||||
|
from .athena import Athena
|
||||||
|
from .redshift import Redshift
|
||||||
|
|
||||||
__all__ = [
|
__all__ = [
|
||||||
"Execute",
|
|
||||||
"Query",
|
|
||||||
"Athena",
|
"Athena",
|
||||||
"Redshift",
|
"Redshift",
|
||||||
"parallel_map"
|
"parallel_map"
|
||||||
|
|
|
||||||
|
|
@ -49,12 +49,12 @@ class Execute:
|
||||||
return self.sql
|
return self.sql
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
resp = self.athena.start_query_execution(QueryString=self.sql)
|
resp = self.athena.start_query_execution(QueryString=self.query())
|
||||||
self.query_id = resp['QueryExecutionId']
|
self.query_id = resp['QueryExecutionId']
|
||||||
return resp
|
return resp
|
||||||
|
|
||||||
def status(self):
|
def status(self):
|
||||||
return self.info()['Status']
|
return self.info()['Status']['State']
|
||||||
|
|
||||||
def info(self):
|
def info(self):
|
||||||
res = self.athena.get_query_execution(QueryExecutionId=self.query_id)
|
res = self.athena.get_query_execution(QueryExecutionId=self.query_id)
|
||||||
|
|
@ -62,8 +62,10 @@ class Execute:
|
||||||
return self.info_cache
|
return self.info_cache
|
||||||
|
|
||||||
def finish(self):
|
def finish(self):
|
||||||
while stat := self.status() in ['QUEUED', 'RUNNING']:
|
stat = self.status()
|
||||||
|
while stat in ['QUEUED', 'RUNNING']:
|
||||||
time.sleep(5)
|
time.sleep(5)
|
||||||
|
stat = self.status()
|
||||||
|
|
||||||
return stat # finalized state
|
return stat # finalized state
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -10,7 +10,8 @@ from joblib import Parallel, delayed
|
||||||
#
|
#
|
||||||
# parallel_map(say, [str(i) for i in range(10)], cores=4)
|
# parallel_map(say, [str(i) for i in range(10)], cores=4)
|
||||||
def parallel_map(func=None, data=None, cores=8):
|
def parallel_map(func=None, data=None, cores=8):
|
||||||
size = len(data) // cores
|
cores = min(cores, len(data))
|
||||||
|
size = len(data) // cores
|
||||||
groups = [data[i:i + size] for i in range(0, len(data), size)]
|
groups = [data[i:i + size] for i in range(0, len(data), size)]
|
||||||
|
|
||||||
def wrapper_func(fs):
|
def wrapper_func(fs):
|
||||||
|
|
|
||||||
|
|
@ -6,6 +6,7 @@ import pyarrow as pa
|
||||||
import pyarrow.dataset
|
import pyarrow.dataset
|
||||||
import pprint
|
import pprint
|
||||||
import datetime
|
import datetime
|
||||||
|
import json
|
||||||
from minerva import parallel_map
|
from minerva import parallel_map
|
||||||
|
|
||||||
pp = pprint.PrettyPrinter(indent=4)
|
pp = pprint.PrettyPrinter(indent=4)
|
||||||
|
|
@ -66,8 +67,10 @@ class Execute:
|
||||||
return self.info_cache
|
return self.info_cache
|
||||||
|
|
||||||
def finish(self):
|
def finish(self):
|
||||||
while stat := self.status() in ['SUBMITTED', 'PICKED', 'STARTED']:
|
stat = self.status()
|
||||||
|
while stat in ['SUBMITTED', 'PICKED', 'STARTED']:
|
||||||
time.sleep(5)
|
time.sleep(5)
|
||||||
|
stat = self.status()
|
||||||
|
|
||||||
return stat # finalized state
|
return stat # finalized state
|
||||||
|
|
||||||
|
|
@ -87,9 +90,9 @@ class Query(Execute):
|
||||||
def manifest_files(self):
|
def manifest_files(self):
|
||||||
status = self.finish()
|
status = self.finish()
|
||||||
|
|
||||||
if status == "SUCCEEDED":
|
if status == "FINISHED":
|
||||||
# Track the runtime
|
# Track the runtime
|
||||||
self.runtime = tiedot['UpdatedAt'] - tiedot['CreatedAt']
|
self.runtime = self.info_cache['UpdatedAt'] - self.info_cache['CreatedAt']
|
||||||
|
|
||||||
# Because we're using `UNLOAD`, we get a manifest of the files
|
# Because we're using `UNLOAD`, we get a manifest of the files
|
||||||
# that make up our data.
|
# that make up our data.
|
||||||
|
|
|
||||||
1
test2.py
1
test2.py
|
|
@ -7,7 +7,6 @@ red = m.Redshift("hay", "s3://haystac-pmo-athena/",
|
||||||
db="dev",
|
db="dev",
|
||||||
cluster="redshift-cluster-1")
|
cluster="redshift-cluster-1")
|
||||||
query = red.query("select count(*) from myspectrum_schema.kitware")
|
query = red.query("select count(*) from myspectrum_schema.kitware")
|
||||||
print(query)
|
|
||||||
data = query.results()
|
data = query.results()
|
||||||
pp.pprint(data.head(10))
|
pp.pprint(data.head(10))
|
||||||
print(query.runtime)
|
print(query.runtime)
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue