forked from bellwether/minerva
added comments and dropped useless file
This commit is contained in:
parent
972e6dd210
commit
2613866fa3
2 changed files with 16 additions and 101 deletions
|
|
@ -11,16 +11,19 @@ from minerva import parallel_map
|
||||||
pp = pprint.PrettyPrinter(indent=4)
|
pp = pprint.PrettyPrinter(indent=4)
|
||||||
|
|
||||||
class Athena:
|
class Athena:
|
||||||
def __init__(self, handler, output):
|
def __init__(self, handler, output=None):
|
||||||
self.handler = handler
|
self.handler = handler
|
||||||
self.client = handler.session.client("athena")
|
self.client = handler.session.client("athena")
|
||||||
self.output = output
|
self.output = output
|
||||||
|
|
||||||
|
# For when you want to receive the results of something
|
||||||
def query(self, sql, params=[]):
|
def query(self, sql, params=[]):
|
||||||
q = Query(self, sql, params)
|
q = Query(self, sql, params)
|
||||||
q.run()
|
q.run()
|
||||||
return q
|
return q
|
||||||
|
|
||||||
|
# For when you want to send a query to run, but there aren't results
|
||||||
|
# (like a DML query for creating databases and tables etc)
|
||||||
def execute(self, sql, params=[]):
|
def execute(self, sql, params=[]):
|
||||||
e = Execute(self, sql, params)
|
e = Execute(self, sql, params)
|
||||||
e.run()
|
e.run()
|
||||||
|
|
@ -41,9 +44,11 @@ class Execute:
|
||||||
self.info_cache = None
|
self.info_cache = None
|
||||||
self.files = []
|
self.files = []
|
||||||
|
|
||||||
|
# The string of the query
|
||||||
def query(self):
|
def query(self):
|
||||||
return self.sql
|
return self.sql
|
||||||
|
|
||||||
|
# Send the SQL to Athena for running
|
||||||
def run(self):
|
def run(self):
|
||||||
config = {"OutputLocation": self.athena.output}
|
config = {"OutputLocation": self.athena.output}
|
||||||
if self.params:
|
if self.params:
|
||||||
|
|
@ -57,14 +62,17 @@ class Execute:
|
||||||
self.query_id = resp['QueryExecutionId']
|
self.query_id = resp['QueryExecutionId']
|
||||||
return resp
|
return resp
|
||||||
|
|
||||||
|
# The status of the SQL (running, queued, succeeded, etc.)
|
||||||
def status(self):
|
def status(self):
|
||||||
return self.info()['Status']['State']
|
return self.info()['Status']['State']
|
||||||
|
|
||||||
|
# Get the basic information on the SQL
|
||||||
def info(self):
|
def info(self):
|
||||||
res = self.client.get_query_execution(QueryExecutionId=self.query_id)
|
res = self.client.get_query_execution(QueryExecutionId=self.query_id)
|
||||||
self.info_cache = res['QueryExecution']
|
self.info_cache = res['QueryExecution']
|
||||||
return self.info_cache
|
return self.info_cache
|
||||||
|
|
||||||
|
# Block until the SQL has finished running
|
||||||
def finish(self):
|
def finish(self):
|
||||||
stat = self.status()
|
stat = self.status()
|
||||||
while stat in ['QUEUED', 'RUNNING']:
|
while stat in ['QUEUED', 'RUNNING']:
|
||||||
|
|
@ -79,6 +87,7 @@ class Execute:
|
||||||
class Query(Execute):
|
class Query(Execute):
|
||||||
DATA_STYLE = 'parquet'
|
DATA_STYLE = 'parquet'
|
||||||
|
|
||||||
|
# Automatically includes unloading the results to Parquet format
|
||||||
def query(self):
|
def query(self):
|
||||||
out = os.path.join(self.athena.output,
|
out = os.path.join(self.athena.output,
|
||||||
str(random.random()))
|
str(random.random()))
|
||||||
|
|
@ -86,6 +95,9 @@ class Query(Execute):
|
||||||
f"with (format = '{self.DATA_STYLE}')"
|
f"with (format = '{self.DATA_STYLE}')"
|
||||||
return query
|
return query
|
||||||
|
|
||||||
|
# Gets the files that are listed in the manifest (from the UNLOAD part of
|
||||||
|
# the statement)
|
||||||
|
# Blocks until the query has finished (because it calls `self.finish()`)
|
||||||
def manifest_files(self):
|
def manifest_files(self):
|
||||||
status = self.finish()
|
status = self.finish()
|
||||||
|
|
||||||
|
|
@ -109,6 +121,9 @@ class Query(Execute):
|
||||||
raise
|
raise
|
||||||
#return status # canceled or error
|
#return status # canceled or error
|
||||||
|
|
||||||
|
# Blocks until the query has finished running and then returns you a pyarrow
|
||||||
|
# dataset of the results.
|
||||||
|
# Calls `self.manifest_files()` which blocks via `self.finish()`
|
||||||
def results(self):
|
def results(self):
|
||||||
#local = [self.handler.s3.download(f) for f in self.manifest_files()]
|
#local = [self.handler.s3.download(f) for f in self.manifest_files()]
|
||||||
local = parallel_map(self.handler.s3.download, self.manifest_files())
|
local = parallel_map(self.handler.s3.download, self.manifest_files())
|
||||||
|
|
|
||||||
|
|
@ -1,100 +0,0 @@
|
||||||
import boto3
|
|
||||||
import os
|
|
||||||
import random
|
|
||||||
import time
|
|
||||||
import pyarrow as pa
|
|
||||||
import pyarrow.dataset
|
|
||||||
import pprint
|
|
||||||
import json
|
|
||||||
import datetime
|
|
||||||
|
|
||||||
pp = pprint.PrettyPrinter(indent=4)
|
|
||||||
|
|
||||||
class Redshift:
|
|
||||||
def __init__(self, profile, output, db=None, cluster=None):
|
|
||||||
self.session = boto3.session.Session(profile_name=profile)
|
|
||||||
self.redshift = self.session.client("redshift-data")
|
|
||||||
self.output = output
|
|
||||||
self.database = db
|
|
||||||
self.cluster = cluster
|
|
||||||
|
|
||||||
def query(self, sql):
|
|
||||||
q = Query(self, sql)
|
|
||||||
q.run()
|
|
||||||
return q
|
|
||||||
|
|
||||||
def download(self, s3):
|
|
||||||
bucket = s3.split("/")[2]
|
|
||||||
file = os.path.join(*s3.split("/")[3:])
|
|
||||||
tmp = f"/tmp/{random.random()}.bin"
|
|
||||||
self.session.client('s3').download_file(bucket, file, tmp)
|
|
||||||
|
|
||||||
return tmp
|
|
||||||
|
|
||||||
class Query:
|
|
||||||
DATA_STYLE = 'parquet'
|
|
||||||
|
|
||||||
def __init__(self, handler, sql):
|
|
||||||
self.handler = handler
|
|
||||||
self.redshift = handler.redshift
|
|
||||||
self.sql = sql
|
|
||||||
self.out = None
|
|
||||||
|
|
||||||
def run(self):
|
|
||||||
self.out = os.path.join(self.handler.output,
|
|
||||||
str(random.random()))
|
|
||||||
query = f"unload ({repr(self.sql)}) to {repr(self.out)} " + \
|
|
||||||
f"iam_role default " + \
|
|
||||||
f"format as {self.DATA_STYLE} " + \
|
|
||||||
f"manifest"
|
|
||||||
|
|
||||||
resp = self.redshift.execute_statement(Sql=query,
|
|
||||||
Database=self.handler.database,
|
|
||||||
ClusterIdentifier=self.handler.cluster)
|
|
||||||
self.query_id = resp['Id']
|
|
||||||
return resp
|
|
||||||
|
|
||||||
def status(self):
|
|
||||||
return self.info()['Status']
|
|
||||||
|
|
||||||
def info(self):
|
|
||||||
res = self.redshift.describe_statement(Id=self.query_id)
|
|
||||||
return res
|
|
||||||
|
|
||||||
def results(self):
|
|
||||||
tiedot = self.info()
|
|
||||||
status = tiedot['Status']
|
|
||||||
|
|
||||||
while status in ['SUBMITTED', 'PICKED', 'STARTED']:
|
|
||||||
time.sleep(5)
|
|
||||||
tiedot = self.info()
|
|
||||||
status = tiedot['Status']
|
|
||||||
|
|
||||||
if status == "FINISHED":
|
|
||||||
# Because we're using `UNLOAD`, we get a manifest of the files
|
|
||||||
# that make up our data.
|
|
||||||
files = self.manifest(tiedot)
|
|
||||||
files = [f.strip() for f in files if f.strip()] # filter empty
|
|
||||||
|
|
||||||
# TODO parallelize this
|
|
||||||
local = [self.handler.download(f) for f in files]
|
|
||||||
self.ds = pa.dataset.dataset(local)
|
|
||||||
|
|
||||||
self.runtime = tiedot['UpdatedAt'] - tiedot['CreatedAt']
|
|
||||||
|
|
||||||
return self.ds
|
|
||||||
else:
|
|
||||||
print("Error:")
|
|
||||||
pp.pprint(tiedot)
|
|
||||||
return status # canceled or error
|
|
||||||
|
|
||||||
def manifest(self, tiedot):
|
|
||||||
manif = self.out + "manifest"
|
|
||||||
tmp = self.handler.download(manif)
|
|
||||||
with open(tmp, "r") as f:
|
|
||||||
js = json.load(f)
|
|
||||||
|
|
||||||
return [e['url'] for e in js['entries']]
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue