forked from bellwether/minerva
add tempfile deletion to query; ls to s3
This commit is contained in:
parent
1d509f544c
commit
972e6dd210
2 changed files with 23 additions and 9 deletions
|
|
@ -39,6 +39,7 @@ class Execute:
|
||||||
self.sql = sql
|
self.sql = sql
|
||||||
self.params = [str(p) for p in params]
|
self.params = [str(p) for p in params]
|
||||||
self.info_cache = None
|
self.info_cache = None
|
||||||
|
self.files = []
|
||||||
|
|
||||||
def query(self):
|
def query(self):
|
||||||
return self.sql
|
return self.sql
|
||||||
|
|
@ -96,13 +97,11 @@ class Query(Execute):
|
||||||
# Because we're using `UNLOAD`, we get a manifest of the files
|
# Because we're using `UNLOAD`, we get a manifest of the files
|
||||||
# that make up our data.
|
# that make up our data.
|
||||||
manif = self.info_cache['Statistics']['DataManifestLocation']
|
manif = self.info_cache['Statistics']['DataManifestLocation']
|
||||||
tmp = self.handler.s3.download(manif)
|
files = self.handler.s3.read(manif).split("\n")
|
||||||
with open(tmp, "r") as f:
|
|
||||||
txt = f.read()
|
|
||||||
|
|
||||||
files = txt.strip().split("\n")
|
|
||||||
files = [f.strip() for f in files if f.strip()] # filter empty
|
files = [f.strip() for f in files if f.strip()] # filter empty
|
||||||
|
|
||||||
|
self.files = files
|
||||||
|
|
||||||
return files
|
return files
|
||||||
else:
|
else:
|
||||||
print("Error")
|
print("Error")
|
||||||
|
|
@ -116,3 +115,13 @@ class Query(Execute):
|
||||||
self.ds = pa.dataset.dataset(local)
|
self.ds = pa.dataset.dataset(local)
|
||||||
return self.ds
|
return self.ds
|
||||||
|
|
||||||
|
def __enter__(self):
|
||||||
|
return self
|
||||||
|
|
||||||
|
def __exit__(self):
|
||||||
|
self.close()
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
for file in self.files:
|
||||||
|
os.remove(file)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -6,10 +6,10 @@ class S3:
|
||||||
def __init__(self, handler):
|
def __init__(self, handler):
|
||||||
self.handler = handler
|
self.handler = handler
|
||||||
self.s3 = handler.session.client("s3")
|
self.s3 = handler.session.client("s3")
|
||||||
|
self.resource = handler.session.resource("s3")
|
||||||
|
|
||||||
def parse(self, uri):
|
def parse(self, uri):
|
||||||
bucket = uri.split("/")[2]
|
_, _, bucket, file = uri.split("/", 3)
|
||||||
file = os.path.join(*uri.split("/")[3:])
|
|
||||||
return bucket, file
|
return bucket, file
|
||||||
|
|
||||||
def download(self, uri, loc=None):
|
def download(self, uri, loc=None):
|
||||||
|
|
@ -31,3 +31,8 @@ class S3:
|
||||||
bucket, key = self.parse(remote)
|
bucket, key = self.parse(remote)
|
||||||
return self.s3.upload_file(local, bucket, key)
|
return self.s3.upload_file(local, bucket, key)
|
||||||
|
|
||||||
|
def ls(self, uri, **kwargs):
|
||||||
|
bucket, key = self.parse(uri)
|
||||||
|
#self.s3.list_objects_v2(Bucket=bucket, Prefix=key)
|
||||||
|
return self.resource.Bucket(bucket).objects.filter(Prefix=key, **kwargs)
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue