forked from bellwether/minerva
Various updates to export the main classes, update dependencies, and add an...
This commit is contained in:
parent
2c79e43f5f
commit
07d82c84d2
6 changed files with 424 additions and 12 deletions
|
|
@ -0,0 +1,7 @@
|
|||
from .minerva import Athena, Execute, Query
|
||||
|
||||
__all__ = [
|
||||
"Execute",
|
||||
"Query",
|
||||
"Athena"
|
||||
]
|
||||
|
|
@ -19,6 +19,11 @@ class Athena:
|
|||
q = Query(self, sql)
|
||||
q.run()
|
||||
return q
|
||||
|
||||
def execute(self, sql):
|
||||
e = Execute(self, sql)
|
||||
e.run()
|
||||
return e
|
||||
|
||||
def download(self, s3):
|
||||
bucket = s3.split("/")[2]
|
||||
|
|
@ -28,6 +33,44 @@ class Athena:
|
|||
|
||||
return tmp
|
||||
|
||||
class Execute:
|
||||
"""
|
||||
Execute will not return results, but will execute the SQL and return the final state.
|
||||
Execute is meant to be used for DML statements such as CREATE DATABASE/TABLE
|
||||
"""
|
||||
def __init__(self, handler, sql):
|
||||
self.handler = handler
|
||||
self.athena = handler.athena
|
||||
self.sql = sql
|
||||
|
||||
def run(self):
|
||||
out = os.path.join(self.handler.output,
|
||||
str(random.random()))
|
||||
config = {"OutputLocation": out}
|
||||
resp = self.athena.start_query_execution(QueryString=self.sql,
|
||||
ResultConfiguration=config)
|
||||
self.query_id = resp['QueryExecutionId']
|
||||
return resp
|
||||
|
||||
def status(self):
|
||||
return self.info()['Status']
|
||||
|
||||
def info(self):
|
||||
res = self.athena.get_query_execution(QueryExecutionId=self.query_id)
|
||||
return res['QueryExecution']
|
||||
|
||||
def execute(self):
|
||||
tiedot = self.info()
|
||||
status = tiedot['Status']['State']
|
||||
|
||||
while status in ['QUEUED', 'RUNNING']:
|
||||
time.sleep(5)
|
||||
tiedot = self.info()
|
||||
status = tiedot['Status']['State']
|
||||
|
||||
return status # finalized state
|
||||
|
||||
|
||||
class Query:
|
||||
DATA_STYLE = 'parquet'
|
||||
|
||||
|
|
@ -86,5 +129,3 @@ class Query:
|
|||
tmp = self.handler.download(manif)
|
||||
with open(tmp, "r") as f:
|
||||
return f.read()
|
||||
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue