commit 0f033a54b7553f0c54f17c61a984500c184236a6 Author: Ari Brown Date: Thu Jul 27 13:46:44 2023 -0400 Initial commit diff --git a/__pycache__/access.cpython-310.pyc b/__pycache__/access.cpython-310.pyc new file mode 100644 index 0000000..52831fe Binary files /dev/null and b/__pycache__/access.cpython-310.pyc differ diff --git a/access.py b/access.py new file mode 100644 index 0000000..2befd90 --- /dev/null +++ b/access.py @@ -0,0 +1,82 @@ +import boto3 +import os +import random +import time +import pyarrow as pa +import pyarrow.dataset +import pprint + +pp = pprint.PrettyPrinter(indent=4) + +class Athena: + def __init__(self, profile, output): + self.session = boto3.session.Session(profile_name=profile) + self.athena = self.session.client("athena") + self.output = output + + def query(self, sql): + q = Query(self, sql) + q.run() + return q + + def download(self, s3): + bucket = s3.split("/")[2] + file = os.path.join(*s3.split("/")[3:]) + tmp = f"/tmp/{random.random()}.bin" + self.session.client('s3').download_file(bucket, file, tmp) + + return tmp + +class Query: + DATA_STYLE = 'parquet' + + def __init__(self, handler, sql): + self.handler = handler + self.athena = handler.athena + self.sql = sql + + def run(self): + out = os.path.join(self.handler.output, + str(random.random())) + config = {"OutputLocation": out} + query = f"unload ({self.sql}) to {repr(out)} " + \ + f"with (format = '{self.DATA_STYLE}')" + + resp = self.athena.start_query_execution(QueryString=query, + ResultConfiguration=config) + self.query_id = resp['QueryExecutionId'] + return resp + + def status(self): + return self.info()['Status'] + + def info(self): + res = self.athena.get_query_execution(QueryExecutionId=self.query_id) + return res['QueryExecution'] + + def results(self): + tiedot = self.info() + status = tiedot['Status']['State'] + + while status in ['QUEUED', 'RUNNING']: + time.sleep(5) + status = self.status()['State'] + + if status == "SUCCEEDED": + # Because we're using `UNLOAD`, we get a manifest of the files + # that make up our data. + files = self.manifest(tiedot).strip().split("\n") + local = [self.handler.download(f) for f in files] + self.ds = pa.dataset.dataset(local) + + return self.ds + else: + return status # canceled or error + + def manifest(self, tiedot): + manif = tiedot['Statistics']['DataManifestLocation'] + tmp = self.handler.download(manif) + with open(tmp, "r") as f: + return f.read() + + diff --git a/create_view.sql b/create_view.sql new file mode 100644 index 0000000..474993f --- /dev/null +++ b/create_view.sql @@ -0,0 +1,12 @@ +-- auto-generated +CREATE EXTERNAL TABLE IF NOT EXISTS `trajectories`.`original_kitware` +LOCATION 's3://haystac-archive-phase1.trial1/ta1.kitware/ta1/simulation/train/' +TBLPROPERTIES ('table_type' = 'DELTA'); + +-- view to accomodate for the fact that Athena can't handle timezones +-- required in order to unload the data in any format that's not CSV +create or replace view trajectories.kitware AS +select + agent, from_unixtime(cast(to_unixtime(timestamp) AS bigint)) as timestamp, latitude, longitude +from "trajectories"."original_kitware" + diff --git a/test.py b/test.py new file mode 100644 index 0000000..3798007 --- /dev/null +++ b/test.py @@ -0,0 +1,15 @@ +import access as a +import pprint + +pp = pprint.PrettyPrinter(indent=4) + +athena = a.Athena("hay", "s3://haystac-pmo-athena/") +query = athena.query('select * from "trajectories"."kitware" limit 10') +data = query.results() +print(data.head(10)) + +# Everything *needs* to have a column in order for parquet to work, so scalar +# values have to be assigned something, so here we use `as count` to create +# a temporary column called `count` +print(athena.query("select count(*) as count from trajectories.kitware").results().head(1)) +