From 972e6dd21011516de33a1440c112b1887551dc0c Mon Sep 17 00:00:00 2001 From: Ari Brown Date: Mon, 7 Aug 2023 18:10:10 -0400 Subject: [PATCH] add tempfile deletion to query; ls to s3 --- minerva/athena.py | 19 ++++++++++++++----- minerva/s3.py | 13 +++++++++---- 2 files changed, 23 insertions(+), 9 deletions(-) diff --git a/minerva/athena.py b/minerva/athena.py index ac2ccc5..fe059a6 100644 --- a/minerva/athena.py +++ b/minerva/athena.py @@ -39,6 +39,7 @@ class Execute: self.sql = sql self.params = [str(p) for p in params] self.info_cache = None + self.files = [] def query(self): return self.sql @@ -96,13 +97,11 @@ class Query(Execute): # Because we're using `UNLOAD`, we get a manifest of the files # that make up our data. manif = self.info_cache['Statistics']['DataManifestLocation'] - tmp = self.handler.s3.download(manif) - with open(tmp, "r") as f: - txt = f.read() - - files = txt.strip().split("\n") + files = self.handler.s3.read(manif).split("\n") files = [f.strip() for f in files if f.strip()] # filter empty + self.files = files + return files else: print("Error") @@ -116,3 +115,13 @@ class Query(Execute): self.ds = pa.dataset.dataset(local) return self.ds + def __enter__(self): + return self + + def __exit__(self): + self.close() + + def close(self): + for file in self.files: + os.remove(file) + diff --git a/minerva/s3.py b/minerva/s3.py index 4a694ce..e4010fb 100644 --- a/minerva/s3.py +++ b/minerva/s3.py @@ -4,12 +4,12 @@ import random class S3: def __init__(self, handler): - self.handler = handler - self.s3 = handler.session.client("s3") + self.handler = handler + self.s3 = handler.session.client("s3") + self.resource = handler.session.resource("s3") def parse(self, uri): - bucket = uri.split("/")[2] - file = os.path.join(*uri.split("/")[3:]) + _, _, bucket, file = uri.split("/", 3) return bucket, file def download(self, uri, loc=None): @@ -31,3 +31,8 @@ class S3: bucket, key = self.parse(remote) return self.s3.upload_file(local, bucket, key) + def ls(self, uri, **kwargs): + bucket, key = self.parse(uri) + #self.s3.list_objects_v2(Bucket=bucket, Prefix=key) + return self.resource.Bucket(bucket).objects.filter(Prefix=key, **kwargs) +