diff --git a/minerva/__init__.py b/minerva/__init__.py index d1318eb..5fbad8a 100644 --- a/minerva/__init__.py +++ b/minerva/__init__.py @@ -7,6 +7,7 @@ from .s3 import S3 from .docker import Docker from .machine import Machine from .pier import Pier +from .pool import Pool from .minerva import Minerva @@ -22,6 +23,7 @@ __all__ = [ "cluster_pool", "load_template", "load_sql", - "AWS_INSTANCES" + "AWS_INSTANCES", + "Pool" ] diff --git a/minerva/athena.py b/minerva/athena.py index fcd0227..e600de9 100644 --- a/minerva/athena.py +++ b/minerva/athena.py @@ -67,24 +67,6 @@ class Athena: return e - # FIXME bad sql, can't drop multiple tables in athena - def delete_tables(self, db_name, tables): - e = Execute(self, f"drop table {', '.join(tables)}") - e.run() - e.finish() - - try: - self.glue.batch_delete_table(DatabaseName = db_name, - TablesToDelete = tables) - finally: - pass - - for table in tables: - s3_uri = os.path.join(self.output, table, "") - #print(f"deleting {s3_uri}") - self.handler.s3.rm(s3_uri) - - def cancel(self, query_id): return self.client.stop_query_execution(QueryExecutionId = query_id) @@ -106,7 +88,11 @@ class Athena: e.run() e.finish() - # 2. In chunks + # 2. Run the Glue ETL job + return self.glue.start_job_run( + JobName = 'convert table to delta', + Arguments = {'--from': table, + '--to': to}) def describe_columns(self, table): diff --git a/minerva/pool.py b/minerva/pool.py new file mode 100644 index 0000000..a3e3683 --- /dev/null +++ b/minerva/pool.py @@ -0,0 +1,63 @@ +from threading import Thread, Lock + +class Pool: + def __init__(self, worker, num=1): + # TODO can move the creation into a thread, but that might be too + # many concurrent requests for AWS + self.machines = [worker(i).create() for i in range(num)] + self.mutex = None + + for machine in self.machines: + machine.join() + machine.login() + + def run(func, data=[]): + if not data or not func: + return + + # We'll be modifying this, don't mess with the original + self.mutex = Lock() + data = data.copy() + + # All threads are sharing the same `data` and access is controlled by a mutex + threads = [Thread(target=self.process_queue, args=(machine, func, data)) + for machine in self.machines] + + # Start the threads + for thread in threads: + thread.start() + + # Wait for the workers to finish + # TODO maybe return STDOUT from everything? + for thread in threads: + thread.join() + + + def process_queue(self, machine, func, data): + self.mutex.acquire() + while data: + item = data.pop() + if type(item) == type([]): + print(f"i'm doing work with [{min(item)}..{max(item)}] on {machine}") + else: + print(f"i'm doing work with {item} on {machine}") + self.mutex.release() + + # do the work + func(machine, item) + #time.sleep(0.5) + + # prior to return to the while-loop check + self.mutex.acquire() + + self.mutex.release() # we're done! + + + def terminate(self): + for mach in self.machines: + mach.terminate() + + + def cost(self): + return sum([mach.cost() for mach in machines]) +