diff --git a/examples/export_table.py b/examples/export_table.py new file mode 100644 index 0000000..3384a97 --- /dev/null +++ b/examples/export_table.py @@ -0,0 +1,52 @@ +import minerva +import math + +m = minerva.Minerva("hay-te") +athena = m.athena("s3://haystac-te-athena/") + +partition_by = "chunk" +table = "haystacdb.kitware_med" +cols = athena.describe_columns(table) + +out = "haystacdb.exported_table" +dest = "s3://haystac-te-athena/test_export/" + +# Have to use different datatypes here +cols = [(name, "int") if dtype == "long" else (name, dtype) + for name, dtype in cols] +cols = [(name, "int") if dtype == "integer" else (name, dtype) + for name, dtype in cols] + +# TODO might have to replace integer with int +part_col = [x for x in cols if x[0] == partition_by][0] +rest_cols = [x for x in cols if x[0] != partition_by] + +string = f""" +create external table {out}({', '.join(map(lambda x: ' '.join(x), rest_cols))}) +partitioned by ({' '.join(part_col)}) +location '{dest}' +tblproperties ('spark.sql.sources.provider' = 'delta', + 'parquet.compression' = 'zstd') +""" +print(string) + +try: + e = athena.execute(string) + e.finish() + + count = athena.query(f"select count(distinct {partition_by}) as count from {table}").scalar() + + concurrent = 100 # 100 is max concurrent writers allowed + + col_names = map(lambda x: x[0], cols) + + for i in range(0, count, concurrent): + print(i) + sql = f"insert into {out} select {', '.join(col_names)} from {table} where {partition_by} >= {i} and {partition_by} < {i + concurrent}" + print(sql) + q = athena.query(sql) + q.finish() + +finally: + athena.delete_table("haystacdb", out) +