From 0f033a54b7553f0c54f17c61a984500c184236a6 Mon Sep 17 00:00:00 2001 From: Ari Brown Date: Thu, 27 Jul 2023 13:46:44 -0400 Subject: [PATCH] Initial commit --- __pycache__/access.cpython-310.pyc | Bin 0 -> 3069 bytes access.py | 82 +++++++++++++++++++++++++++++ create_view.sql | 12 +++++ test.py | 15 ++++++ 4 files changed, 109 insertions(+) create mode 100644 __pycache__/access.cpython-310.pyc create mode 100644 access.py create mode 100644 create_view.sql create mode 100644 test.py diff --git a/__pycache__/access.cpython-310.pyc b/__pycache__/access.cpython-310.pyc new file mode 100644 index 0000000000000000000000000000000000000000..52831fe3342b97ccb21704365ea996095d2d8fa8 GIT binary patch literal 3069 zcmZuzTW=f36`q;BlBQ%Oa-7(`F-_Wda}k>wMO)N0Y*<1Nz-1CMLXiauV$E6EOH*8G zW>{4O2^5fv`o%5!To@o9{WtnE=Cx1y3x25komo_Zb z3nqOTJZ3UD7HJ*5P)6uQ=v|P!!;<&}C$aX451DWq7DJg2O^*i8J_4OvDEdQ)oRKn6 zgiG<5K@m^-sBw@CQ3o=jjcs__Jj{n#k1n-(s7>V#ioOP6SfBUBluQ1UPgxhG&*z>L z3+nk(0f`G@Kd+tev@ufUUNOkK!)!mdVdf4>J|JLZp#@%i)EKIlCz_&Vt*x>8D&L2V9P5H5yiK&?L z1>cHJY2zt+oJXh-a4VnpU|x*LCfrqS`&ZkelfH{Uk6SCJsK3oic(vD1Pfcg zUt%&;*-)1ID#B+fhDg_Bc{m)Dnd}kOa4n&%+qLy>cfXWlz^-c>-R{9S8&orD4cn+q5{mJV{sbFSk^Ee$-2CNI+hK& zhPo!#Vfl;dBDMj@GX>Y1H)u$wInTd`#+2v=*{?`Qv_qWfFtv_J5GHyEXAE-Xh@=Q& zODw7n&~^i_Uz}|7K8nL)2rPmX>kiAWNWn3`dI*U`Ag9wF8WbAl(4_D6eG`~)9xm81 zUx*ok#grF(CM3;9K%r-1(1)YtKMQ1l(Hh#}Gw)rX6}@`@0#GSr^i%+?zKpMcaNUMO zXsw;Df8_$h-LjV%gpWx)b}(yRn*>SIl+72sIoUWY%){ojy;ALGruqBkM-%Z;k|2z_ zD|L)2h7Z!sd$}GD%$;($S3DRix0~H?D|Y^p_r|pQmoiyXS8zXdmBbH7I3rRN0=11& zKcq1-vh`IyQntQ*``g>y&bQy)-Ej^`-$6uD21o?oyZLHo8&?B`Tyi7z5b@Hm=hSgH zO|hYVof~QcNBtZ{6BHV7l>agomwAI~Pw?<%UHE)bbNARn9O|9*8?{fdM3A4dIa?r( zT%oWn+BWQ%%#1a%i(d@)%ICb|?JKgl;1%LZSoKR%gSKlRa#aUb6X(vP1s;mSio-AK z11<10lG0N?13oB*Ez^ej8J4CM_Dzq36qBN;!Ar`|SLGV=}o0_HIyF@M6kwI^Ui$0e1d)>a=w zk2D~+{9FO-np~0v%RxzdHvj#$oRB`;+{jy1GxN8 z@eBv`*&L={@PCN|OCOsp7X z2$z}3bZ+niqnAXnp(%1%n(EVll8Cl}JFe2w>FZRD00&T9{}$W1+sEKZEb1ix7yFmI zxA6lI3M7@abK1X=c_|+_{I=1cAD~k-RA$64(9AQ)rqVkW|7=j)B24@dWRuLtd+c!_ z{Bx$8hd>Oy-=VmlCBL24$-OXB(bMUrZQS?|OM9%MoaHLa$}CZrcSN^nUPD^Zs+e)$i_FS--DYJ3iTxjm*)`{s<$E5 I#oL?z2SO&3rT_o{ literal 0 HcmV?d00001 diff --git a/access.py b/access.py new file mode 100644 index 0000000..2befd90 --- /dev/null +++ b/access.py @@ -0,0 +1,82 @@ +import boto3 +import os +import random +import time +import pyarrow as pa +import pyarrow.dataset +import pprint + +pp = pprint.PrettyPrinter(indent=4) + +class Athena: + def __init__(self, profile, output): + self.session = boto3.session.Session(profile_name=profile) + self.athena = self.session.client("athena") + self.output = output + + def query(self, sql): + q = Query(self, sql) + q.run() + return q + + def download(self, s3): + bucket = s3.split("/")[2] + file = os.path.join(*s3.split("/")[3:]) + tmp = f"/tmp/{random.random()}.bin" + self.session.client('s3').download_file(bucket, file, tmp) + + return tmp + +class Query: + DATA_STYLE = 'parquet' + + def __init__(self, handler, sql): + self.handler = handler + self.athena = handler.athena + self.sql = sql + + def run(self): + out = os.path.join(self.handler.output, + str(random.random())) + config = {"OutputLocation": out} + query = f"unload ({self.sql}) to {repr(out)} " + \ + f"with (format = '{self.DATA_STYLE}')" + + resp = self.athena.start_query_execution(QueryString=query, + ResultConfiguration=config) + self.query_id = resp['QueryExecutionId'] + return resp + + def status(self): + return self.info()['Status'] + + def info(self): + res = self.athena.get_query_execution(QueryExecutionId=self.query_id) + return res['QueryExecution'] + + def results(self): + tiedot = self.info() + status = tiedot['Status']['State'] + + while status in ['QUEUED', 'RUNNING']: + time.sleep(5) + status = self.status()['State'] + + if status == "SUCCEEDED": + # Because we're using `UNLOAD`, we get a manifest of the files + # that make up our data. + files = self.manifest(tiedot).strip().split("\n") + local = [self.handler.download(f) for f in files] + self.ds = pa.dataset.dataset(local) + + return self.ds + else: + return status # canceled or error + + def manifest(self, tiedot): + manif = tiedot['Statistics']['DataManifestLocation'] + tmp = self.handler.download(manif) + with open(tmp, "r") as f: + return f.read() + + diff --git a/create_view.sql b/create_view.sql new file mode 100644 index 0000000..474993f --- /dev/null +++ b/create_view.sql @@ -0,0 +1,12 @@ +-- auto-generated +CREATE EXTERNAL TABLE IF NOT EXISTS `trajectories`.`original_kitware` +LOCATION 's3://haystac-archive-phase1.trial1/ta1.kitware/ta1/simulation/train/' +TBLPROPERTIES ('table_type' = 'DELTA'); + +-- view to accomodate for the fact that Athena can't handle timezones +-- required in order to unload the data in any format that's not CSV +create or replace view trajectories.kitware AS +select + agent, from_unixtime(cast(to_unixtime(timestamp) AS bigint)) as timestamp, latitude, longitude +from "trajectories"."original_kitware" + diff --git a/test.py b/test.py new file mode 100644 index 0000000..3798007 --- /dev/null +++ b/test.py @@ -0,0 +1,15 @@ +import access as a +import pprint + +pp = pprint.PrettyPrinter(indent=4) + +athena = a.Athena("hay", "s3://haystac-pmo-athena/") +query = athena.query('select * from "trajectories"."kitware" limit 10') +data = query.results() +print(data.head(10)) + +# Everything *needs* to have a column in order for parquet to work, so scalar +# values have to be assigned something, so here we use `as count` to create +# a temporary column called `count` +print(athena.query("select count(*) as count from trajectories.kitware").results().head(1)) +