forked from bellwether/minerva
added helpers for local files, loading templates, and an example for canceling queries
This commit is contained in:
parent
5bd2218612
commit
ae3173b510
6 changed files with 29 additions and 19 deletions
|
|
@ -1,17 +1,15 @@
|
||||||
import minerva
|
import minerva
|
||||||
|
import sys
|
||||||
|
|
||||||
m = minerva.Minerva("hay-te")
|
m = minerva.Minerva("hay-te")
|
||||||
|
|
||||||
athena = m.athena("s3://haystac-te-athena/")
|
athena = m.athena("s3://haystac-te-athena/")
|
||||||
file = "/tmp/queries.txt"
|
file = sys.argv[1] # "/tmp/queries.txt"
|
||||||
|
|
||||||
with open(file, 'r') as f:
|
with open(file, 'r') as f:
|
||||||
txt = f.read()
|
txt = f.read()
|
||||||
|
|
||||||
for line in txt.split("\n"):
|
for line in txt.split("\n"):
|
||||||
if not line:
|
|
||||||
continue
|
|
||||||
|
|
||||||
print(line)
|
print(line)
|
||||||
athena.cancel(line)
|
athena.cancel(line)
|
||||||
|
|
||||||
|
|
@ -1,4 +1,4 @@
|
||||||
from .parallel import parallel_map
|
from .helpers import parallel_map, local, load_template
|
||||||
|
|
||||||
from .athena import Athena
|
from .athena import Athena
|
||||||
from .redshift import Redshift
|
from .redshift import Redshift
|
||||||
|
|
@ -18,6 +18,8 @@ __all__ = [
|
||||||
"Machine",
|
"Machine",
|
||||||
"Pier",
|
"Pier",
|
||||||
"Minerva",
|
"Minerva",
|
||||||
"parallel_map"
|
"parallel_map",
|
||||||
|
"local",
|
||||||
|
"load_template"
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -8,21 +8,10 @@ import pyarrow.dataset
|
||||||
import pprint
|
import pprint
|
||||||
import datetime
|
import datetime
|
||||||
import dask.dataframe as dd
|
import dask.dataframe as dd
|
||||||
from minerva import parallel_map
|
from minerva import parallel_map, local, load_template
|
||||||
from mako.template import Template
|
|
||||||
|
|
||||||
pp = pprint.PrettyPrinter(indent=4)
|
pp = pprint.PrettyPrinter(indent=4)
|
||||||
|
|
||||||
# Get full path of fname
|
|
||||||
def local(fname):
|
|
||||||
return os.path.join(os.path.abspath(os.path.dirname(__file__)), fname)
|
|
||||||
|
|
||||||
def load_sql(path, **params):
|
|
||||||
with open(path, 'r') as f:
|
|
||||||
query = f.read()
|
|
||||||
|
|
||||||
return Template(query).render(**params)
|
|
||||||
|
|
||||||
class Athena:
|
class Athena:
|
||||||
def __init__(self, handler, output=None):
|
def __init__(self, handler, output=None):
|
||||||
self.handler = handler
|
self.handler = handler
|
||||||
|
|
|
||||||
5
minerva/athena/union_tables.sql
Normal file
5
minerva/athena/union_tables.sql
Normal file
|
|
@ -0,0 +1,5 @@
|
||||||
|
create table ${dest}
|
||||||
|
with (format = 'PARQUET', external_location = ${repr(output)})
|
||||||
|
as
|
||||||
|
${tables}
|
||||||
|
|
||||||
|
|
@ -1,5 +1,8 @@
|
||||||
|
import os
|
||||||
|
import inspect
|
||||||
import math
|
import math
|
||||||
from joblib import Parallel, delayed
|
from joblib import Parallel, delayed
|
||||||
|
from mako.template import Template
|
||||||
|
|
||||||
# If you have a list of 100 elements and want to process it with 8 cores,
|
# If you have a list of 100 elements and want to process it with 8 cores,
|
||||||
# it will split it into 8 chunks (7 chunks of 13, 1 chunk of 9). `func` is
|
# it will split it into 8 chunks (7 chunks of 13, 1 chunk of 9). `func` is
|
||||||
|
|
@ -31,3 +34,15 @@ def parallel_map(func=None, data=None, cores=8):
|
||||||
# Flatten the nested lists
|
# Flatten the nested lists
|
||||||
return [val for r in res for val in r]
|
return [val for r in res for val in r]
|
||||||
|
|
||||||
|
|
||||||
|
# Get full path of fname
|
||||||
|
def local(fname):
|
||||||
|
return os.path.join(inspect.stack()[0][1], fname)
|
||||||
|
|
||||||
|
|
||||||
|
def load_template(path, **params):
|
||||||
|
with open(path, 'r') as f:
|
||||||
|
query = f.read()
|
||||||
|
|
||||||
|
return Template(query).render(**params)
|
||||||
|
|
||||||
|
|
@ -1,6 +1,6 @@
|
||||||
[tool.poetry]
|
[tool.poetry]
|
||||||
name = "minerva"
|
name = "minerva"
|
||||||
version = "0.7.0"
|
version = "0.7.1"
|
||||||
description = "Easier access to AWS Athena and Redshift"
|
description = "Easier access to AWS Athena and Redshift"
|
||||||
authors = [
|
authors = [
|
||||||
"Ari Brown <ari@airintech.com>",
|
"Ari Brown <ari@airintech.com>",
|
||||||
|
|
@ -22,3 +22,4 @@ pyarrow = "^14.0.1"
|
||||||
joblib = "^1.1.0"
|
joblib = "^1.1.0"
|
||||||
fabric = "^3.0.0"
|
fabric = "^3.0.0"
|
||||||
s3fs = "2023.6.0"
|
s3fs = "2023.6.0"
|
||||||
|
mako = ">1.2.0"
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue