From 2613866fa33b92fc0c0e6a578f61af48b87c0cb0 Mon Sep 17 00:00:00 2001 From: Ari Brown Date: Fri, 18 Aug 2023 12:54:45 -0400 Subject: [PATCH] added comments and dropped useless file --- minerva/athena.py | 17 +++++++- minerva/blueshift.py | 100 ------------------------------------------- 2 files changed, 16 insertions(+), 101 deletions(-) delete mode 100644 minerva/blueshift.py diff --git a/minerva/athena.py b/minerva/athena.py index fe059a6..d4738b6 100644 --- a/minerva/athena.py +++ b/minerva/athena.py @@ -11,16 +11,19 @@ from minerva import parallel_map pp = pprint.PrettyPrinter(indent=4) class Athena: - def __init__(self, handler, output): + def __init__(self, handler, output=None): self.handler = handler self.client = handler.session.client("athena") self.output = output + # For when you want to receive the results of something def query(self, sql, params=[]): q = Query(self, sql, params) q.run() return q + # For when you want to send a query to run, but there aren't results + # (like a DML query for creating databases and tables etc) def execute(self, sql, params=[]): e = Execute(self, sql, params) e.run() @@ -41,9 +44,11 @@ class Execute: self.info_cache = None self.files = [] + # The string of the query def query(self): return self.sql + # Send the SQL to Athena for running def run(self): config = {"OutputLocation": self.athena.output} if self.params: @@ -57,14 +62,17 @@ class Execute: self.query_id = resp['QueryExecutionId'] return resp + # The status of the SQL (running, queued, succeeded, etc.) def status(self): return self.info()['Status']['State'] + # Get the basic information on the SQL def info(self): res = self.client.get_query_execution(QueryExecutionId=self.query_id) self.info_cache = res['QueryExecution'] return self.info_cache + # Block until the SQL has finished running def finish(self): stat = self.status() while stat in ['QUEUED', 'RUNNING']: @@ -79,6 +87,7 @@ class Execute: class Query(Execute): DATA_STYLE = 'parquet' + # Automatically includes unloading the results to Parquet format def query(self): out = os.path.join(self.athena.output, str(random.random())) @@ -86,6 +95,9 @@ class Query(Execute): f"with (format = '{self.DATA_STYLE}')" return query + # Gets the files that are listed in the manifest (from the UNLOAD part of + # the statement) + # Blocks until the query has finished (because it calls `self.finish()`) def manifest_files(self): status = self.finish() @@ -109,6 +121,9 @@ class Query(Execute): raise #return status # canceled or error + # Blocks until the query has finished running and then returns you a pyarrow + # dataset of the results. + # Calls `self.manifest_files()` which blocks via `self.finish()` def results(self): #local = [self.handler.s3.download(f) for f in self.manifest_files()] local = parallel_map(self.handler.s3.download, self.manifest_files()) diff --git a/minerva/blueshift.py b/minerva/blueshift.py deleted file mode 100644 index 3090005..0000000 --- a/minerva/blueshift.py +++ /dev/null @@ -1,100 +0,0 @@ -import boto3 -import os -import random -import time -import pyarrow as pa -import pyarrow.dataset -import pprint -import json -import datetime - -pp = pprint.PrettyPrinter(indent=4) - -class Redshift: - def __init__(self, profile, output, db=None, cluster=None): - self.session = boto3.session.Session(profile_name=profile) - self.redshift = self.session.client("redshift-data") - self.output = output - self.database = db - self.cluster = cluster - - def query(self, sql): - q = Query(self, sql) - q.run() - return q - - def download(self, s3): - bucket = s3.split("/")[2] - file = os.path.join(*s3.split("/")[3:]) - tmp = f"/tmp/{random.random()}.bin" - self.session.client('s3').download_file(bucket, file, tmp) - - return tmp - -class Query: - DATA_STYLE = 'parquet' - - def __init__(self, handler, sql): - self.handler = handler - self.redshift = handler.redshift - self.sql = sql - self.out = None - - def run(self): - self.out = os.path.join(self.handler.output, - str(random.random())) - query = f"unload ({repr(self.sql)}) to {repr(self.out)} " + \ - f"iam_role default " + \ - f"format as {self.DATA_STYLE} " + \ - f"manifest" - - resp = self.redshift.execute_statement(Sql=query, - Database=self.handler.database, - ClusterIdentifier=self.handler.cluster) - self.query_id = resp['Id'] - return resp - - def status(self): - return self.info()['Status'] - - def info(self): - res = self.redshift.describe_statement(Id=self.query_id) - return res - - def results(self): - tiedot = self.info() - status = tiedot['Status'] - - while status in ['SUBMITTED', 'PICKED', 'STARTED']: - time.sleep(5) - tiedot = self.info() - status = tiedot['Status'] - - if status == "FINISHED": - # Because we're using `UNLOAD`, we get a manifest of the files - # that make up our data. - files = self.manifest(tiedot) - files = [f.strip() for f in files if f.strip()] # filter empty - - # TODO parallelize this - local = [self.handler.download(f) for f in files] - self.ds = pa.dataset.dataset(local) - - self.runtime = tiedot['UpdatedAt'] - tiedot['CreatedAt'] - - return self.ds - else: - print("Error:") - pp.pprint(tiedot) - return status # canceled or error - - def manifest(self, tiedot): - manif = self.out + "manifest" - tmp = self.handler.download(manif) - with open(tmp, "r") as f: - js = json.load(f) - - return [e['url'] for e in js['entries']] - - -