diff --git a/examples/cancel_query.py b/examples/cancel_queries.py similarity index 76% rename from examples/cancel_query.py rename to examples/cancel_queries.py index bcfe65c..15c8e82 100644 --- a/examples/cancel_query.py +++ b/examples/cancel_queries.py @@ -1,17 +1,15 @@ import minerva +import sys m = minerva.Minerva("hay-te") athena = m.athena("s3://haystac-te-athena/") -file = "/tmp/queries.txt" +file = sys.argv[1] # "/tmp/queries.txt" with open(file, 'r') as f: txt = f.read() for line in txt.split("\n"): - if not line: - continue - print(line) athena.cancel(line) diff --git a/minerva/__init__.py b/minerva/__init__.py index beb4cfa..5432993 100644 --- a/minerva/__init__.py +++ b/minerva/__init__.py @@ -1,4 +1,4 @@ -from .parallel import parallel_map +from .helpers import parallel_map, local, load_template from .athena import Athena from .redshift import Redshift @@ -18,6 +18,8 @@ __all__ = [ "Machine", "Pier", "Minerva", - "parallel_map" + "parallel_map", + "local", + "load_template" ] diff --git a/minerva/athena.py b/minerva/athena.py index 099bd27..36e853f 100644 --- a/minerva/athena.py +++ b/minerva/athena.py @@ -8,21 +8,10 @@ import pyarrow.dataset import pprint import datetime import dask.dataframe as dd -from minerva import parallel_map -from mako.template import Template +from minerva import parallel_map, local, load_template pp = pprint.PrettyPrinter(indent=4) -# Get full path of fname -def local(fname): - return os.path.join(os.path.abspath(os.path.dirname(__file__)), fname) - -def load_sql(path, **params): - with open(path, 'r') as f: - query = f.read() - - return Template(query).render(**params) - class Athena: def __init__(self, handler, output=None): self.handler = handler diff --git a/minerva/athena/union_tables.sql b/minerva/athena/union_tables.sql new file mode 100644 index 0000000..b876155 --- /dev/null +++ b/minerva/athena/union_tables.sql @@ -0,0 +1,5 @@ +create table ${dest} +with (format = 'PARQUET', external_location = ${repr(output)}) +as +${tables} + diff --git a/minerva/parallel.py b/minerva/helpers.py similarity index 78% rename from minerva/parallel.py rename to minerva/helpers.py index 5f4a182..6842993 100644 --- a/minerva/parallel.py +++ b/minerva/helpers.py @@ -1,5 +1,8 @@ +import os +import inspect import math from joblib import Parallel, delayed +from mako.template import Template # If you have a list of 100 elements and want to process it with 8 cores, # it will split it into 8 chunks (7 chunks of 13, 1 chunk of 9). `func` is @@ -31,3 +34,15 @@ def parallel_map(func=None, data=None, cores=8): # Flatten the nested lists return [val for r in res for val in r] + +# Get full path of fname +def local(fname): + return os.path.join(inspect.stack()[0][1], fname) + + +def load_template(path, **params): + with open(path, 'r') as f: + query = f.read() + + return Template(query).render(**params) + diff --git a/pyproject.toml b/pyproject.toml index f6efbb0..962b0a4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "minerva" -version = "0.7.0" +version = "0.7.1" description = "Easier access to AWS Athena and Redshift" authors = [ "Ari Brown ", @@ -22,3 +22,4 @@ pyarrow = "^14.0.1" joblib = "^1.1.0" fabric = "^3.0.0" s3fs = "2023.6.0" +mako = ">1.2.0"