forked from bellwether/minerva
restructuring as a library with athena and redshift
This commit is contained in:
parent
adf909608d
commit
c32f8359e7
8 changed files with 18 additions and 8 deletions
0
minerva/__init__.py
Normal file
0
minerva/__init__.py
Normal file
BIN
minerva/__pycache__/__init__.cpython-310.pyc
Normal file
BIN
minerva/__pycache__/__init__.cpython-310.pyc
Normal file
Binary file not shown.
BIN
minerva/__pycache__/blueshift.cpython-310.pyc
Normal file
BIN
minerva/__pycache__/blueshift.cpython-310.pyc
Normal file
Binary file not shown.
BIN
minerva/__pycache__/minerva.cpython-310.pyc
Normal file
BIN
minerva/__pycache__/minerva.cpython-310.pyc
Normal file
Binary file not shown.
100
minerva/blueshift.py
Normal file
100
minerva/blueshift.py
Normal file
|
|
@ -0,0 +1,100 @@
|
|||
import boto3
|
||||
import os
|
||||
import random
|
||||
import time
|
||||
import pyarrow as pa
|
||||
import pyarrow.dataset
|
||||
import pprint
|
||||
import json
|
||||
import datetime
|
||||
|
||||
pp = pprint.PrettyPrinter(indent=4)
|
||||
|
||||
class Redshift:
|
||||
def __init__(self, profile, output, db=None, cluster=None):
|
||||
self.session = boto3.session.Session(profile_name=profile)
|
||||
self.redshift = self.session.client("redshift-data")
|
||||
self.output = output
|
||||
self.database = db
|
||||
self.cluster = cluster
|
||||
|
||||
def query(self, sql):
|
||||
q = Query(self, sql)
|
||||
q.run()
|
||||
return q
|
||||
|
||||
def download(self, s3):
|
||||
bucket = s3.split("/")[2]
|
||||
file = os.path.join(*s3.split("/")[3:])
|
||||
tmp = f"/tmp/{random.random()}.bin"
|
||||
self.session.client('s3').download_file(bucket, file, tmp)
|
||||
|
||||
return tmp
|
||||
|
||||
class Query:
|
||||
DATA_STYLE = 'parquet'
|
||||
|
||||
def __init__(self, handler, sql):
|
||||
self.handler = handler
|
||||
self.redshift = handler.redshift
|
||||
self.sql = sql
|
||||
self.out = None
|
||||
|
||||
def run(self):
|
||||
self.out = os.path.join(self.handler.output,
|
||||
str(random.random()))
|
||||
query = f"unload ({repr(self.sql)}) to {repr(self.out)} " + \
|
||||
f"iam_role default " + \
|
||||
f"format as {self.DATA_STYLE} " + \
|
||||
f"manifest"
|
||||
|
||||
resp = self.redshift.execute_statement(Sql=query,
|
||||
Database=self.handler.database,
|
||||
ClusterIdentifier=self.handler.cluster)
|
||||
self.query_id = resp['Id']
|
||||
return resp
|
||||
|
||||
def status(self):
|
||||
return self.info()['Status']
|
||||
|
||||
def info(self):
|
||||
res = self.redshift.describe_statement(Id=self.query_id)
|
||||
return res
|
||||
|
||||
def results(self):
|
||||
tiedot = self.info()
|
||||
status = tiedot['Status']
|
||||
|
||||
while status in ['SUBMITTED', 'PICKED', 'STARTED']:
|
||||
time.sleep(5)
|
||||
tiedot = self.info()
|
||||
status = tiedot['Status']
|
||||
|
||||
if status == "FINISHED":
|
||||
# Because we're using `UNLOAD`, we get a manifest of the files
|
||||
# that make up our data.
|
||||
files = self.manifest(tiedot)
|
||||
files = [f.strip() for f in files if f.strip()] # filter empty
|
||||
|
||||
# TODO parallelize this
|
||||
local = [self.handler.download(f) for f in files]
|
||||
self.ds = pa.dataset.dataset(local)
|
||||
|
||||
self.runtime = tiedot['UpdatedAt'] - tiedot['CreatedAt']
|
||||
|
||||
return self.ds
|
||||
else:
|
||||
print("Error:")
|
||||
pp.pprint(tiedot)
|
||||
return status # canceled or error
|
||||
|
||||
def manifest(self, tiedot):
|
||||
manif = self.out + "manifest"
|
||||
tmp = self.handler.download(manif)
|
||||
with open(tmp, "r") as f:
|
||||
js = json.load(f)
|
||||
|
||||
return [e['url'] for e in js['entries']]
|
||||
|
||||
|
||||
|
||||
90
minerva/minerva.py
Normal file
90
minerva/minerva.py
Normal file
|
|
@ -0,0 +1,90 @@
|
|||
import boto3
|
||||
import os
|
||||
import random
|
||||
import time
|
||||
import pyarrow as pa
|
||||
import pyarrow.dataset
|
||||
import pprint
|
||||
import datetime
|
||||
|
||||
pp = pprint.PrettyPrinter(indent=4)
|
||||
|
||||
class Athena:
|
||||
def __init__(self, profile, output):
|
||||
self.session = boto3.session.Session(profile_name=profile)
|
||||
self.athena = self.session.client("athena")
|
||||
self.output = output
|
||||
|
||||
def query(self, sql):
|
||||
q = Query(self, sql)
|
||||
q.run()
|
||||
return q
|
||||
|
||||
def download(self, s3):
|
||||
bucket = s3.split("/")[2]
|
||||
file = os.path.join(*s3.split("/")[3:])
|
||||
tmp = f"/tmp/{random.random()}.bin"
|
||||
self.session.client('s3').download_file(bucket, file, tmp)
|
||||
|
||||
return tmp
|
||||
|
||||
class Query:
|
||||
DATA_STYLE = 'parquet'
|
||||
|
||||
def __init__(self, handler, sql):
|
||||
self.handler = handler
|
||||
self.athena = handler.athena
|
||||
self.sql = sql
|
||||
|
||||
def run(self):
|
||||
out = os.path.join(self.handler.output,
|
||||
str(random.random()))
|
||||
config = {"OutputLocation": out}
|
||||
query = f"unload ({self.sql}) to {repr(out)} " + \
|
||||
f"with (format = '{self.DATA_STYLE}')"
|
||||
|
||||
resp = self.athena.start_query_execution(QueryString=query,
|
||||
ResultConfiguration=config)
|
||||
self.query_id = resp['QueryExecutionId']
|
||||
return resp
|
||||
|
||||
def status(self):
|
||||
return self.info()['Status']
|
||||
|
||||
def info(self):
|
||||
res = self.athena.get_query_execution(QueryExecutionId=self.query_id)
|
||||
return res['QueryExecution']
|
||||
|
||||
def results(self):
|
||||
tiedot = self.info()
|
||||
status = tiedot['Status']['State']
|
||||
|
||||
while status in ['QUEUED', 'RUNNING']:
|
||||
time.sleep(5)
|
||||
tiedot = self.info()
|
||||
status = tiedot['Status']['State']
|
||||
|
||||
if status == "SUCCEEDED":
|
||||
# Because we're using `UNLOAD`, we get a manifest of the files
|
||||
# that make up our data.
|
||||
files = self.manifest(tiedot).strip().split("\n")
|
||||
files = [f.strip() for f in files if f.strip()] # filter empty
|
||||
|
||||
# TODO parallelize this
|
||||
local = [self.handler.download(f) for f in files]
|
||||
self.ds = pa.dataset.dataset(local)
|
||||
|
||||
ms = tiedot['Statistics']['TotalExecutionTimeInMillis']
|
||||
self.runtime = datetime.timedelta(seconds=ms / 1000)
|
||||
|
||||
return self.ds
|
||||
else:
|
||||
return status # canceled or error
|
||||
|
||||
def manifest(self, tiedot):
|
||||
manif = tiedot['Statistics']['DataManifestLocation']
|
||||
tmp = self.handler.download(manif)
|
||||
with open(tmp, "r") as f:
|
||||
return f.read()
|
||||
|
||||
|
||||
Loading…
Add table
Add a link
Reference in a new issue