shared_utils.py
# Databricks notebook source
import json
import time
import re
from pathlib import Path
from pyspark.sql import DataFrame
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from empowerspark.core.storage.datalake import DataLakeManager
from empowerspark.core.storage.delta_lake_handler import DeltaLakeHandler
from empowerspark.target.type.dynamics.data_client import DataClient
from empowerspark.target.type.dynamics import Dynamics
# COMMAND ----------
def get_secret(secret_name) -> dict:
secret = dbutils.secrets.get("primary-key-vault-scope", secret_name)
return json.loads(secret)
def get_rtk(erp) -> str:
return dbutils.secrets.get("primary-key-vault-scope", f"jdbc-{erp}-rtk")
def get_dynamics_client(secret_name, num_of_clients) -> Dynamics:
secret = get_secret(secret_name)
erp = secret["dynamics_erp"]
rtk = get_rtk(erp)
print(
f"""Creating Dynamics client for -> {erp}
url: {secret['dynamics_url']}
azure_tenant: {secret['azure_tenant']}
{secret_name} client_id: {secret['client_id']}"""
)
data_clients = []
data_client = Dynamics.create_client(
erp,
secret["dynamics_url"],
secret["azure_tenant"],
secret["client_id"],
secret["client_secret"],
rtk,
)
data_clients.append(data_client)
# loop through additional secrents and create clients
for i in range(int(num_of_dynamics_clients)):
format_secret_name = additional_secret_names.split("$$")
secret_name = f"{format_secret_name[0]}{i}{format_secret_name[1]}"
try:
secret_value = dbutils.secrets.get("primary-key-vault-scope", secret_name)
match = re.search(
r"OAuthClientID=(.*?);OAuthClientSecret=(.*?);", secret_value
)
if match:
client_id, client_secret = match.groups()
# print(f"{i} {secret_name} client_id: {client_id}")
else:
print(f"Error-1 at {secret_name}: {e}")
data_client = Dynamics.create_client(
erp,
secret["dynamics_url"],
secret["azure_tenant"],
client_id,
client_secret,
rtk,
)
data_clients.append(data_client)
except Exception as e:
print(f"Error-2 at {secret_name}: {e}")
Dynamics.ARTIFACT_DB = ddu_log_db
print(f"Created {len(data_clients)} Dynamics client(s). DDU log db: {ddu_log_db}")
return Dynamics(data_clients)
def read_from_dynamics_table(tab_name) -> DataFrame:
return dynamics_client.clients[0].read_table(tab_name)
def write_to_dynamics_table(df, target_table, key_columns, source_table, mode):
dynamics_client.load_dataframe(df, target_table, key_columns, source_table, mode)
def truncate_dynamics_table(tab_name):
dynamics_client.clients[0].truncate_table(tab_name)
def get_dynamics_quarantine_log(cat_name, db_name, tab_name) -> DataFrame:
spark.catalog.setCurrentCatalog(cat_name)
query = f"""
WITH latest_batch as (
SELECT run_id, MAX(end_time) as max_run_date
FROM {ddu_log_db}.ddu_batch_log
GROUP BY run_id
ORDER BY max_run_date DESC
LIMIT 1
)
, log_table as (select * from {ddu_log_db}.{tab_name})
--SELECT a.*
SELECT a.__ddu_error, a.__ddu_run_id, a.__ddu_mode, count(*)
FROM log_table a
INNER JOIN latest_batch b
on b.run_id = a.__ddu_run_id
group by a.__ddu_error, a.__ddu_run_id, a.__ddu_mode
"""
print(query)
df = spark.sql(query)
return df
def get_dynamics_batch_log(cat_name, db_name, tab_name, target_table) -> DataFrame:
# df = spark.sql(f"select * from {cat_name}.{db_name}.{tab_name}").orderBy(col("log_id").desc())
query = f"""select * from {cat_name}.{ddu_log_db}.{tab_name} where table_name = '{target_table}' order by log_id desc """
print(query)
df = spark.sql(query)
return df
def get_dynamics_quarantine_log_for_run_id(
cat_name, db_name, tab_name, run_id
) -> DataFrame:
query = f"""select * from {cat_name}.{db_name}.{tab_name} where __ddu_run_id = '{run_id}' """
print(query)
df = spark.sql(query)
return df
def read_from_delta_table(cat_name, db_name, tab_name) -> DataFrame:
return spark.table(f"{cat_name}.{db_name}.{tab_name}")
def save_as_delta_table(df, cat_name, db_name, tab_name):
drop_delta_table(cat_name, db_name, tab_name)
table_path = dlh.build_delta_file_path(db_name, tab_name)
dlh.write_delta_table(df, tab_name, db_name, mode="overwrite", mergeschema=True)
def drop_delta_table(cat_name, db_name, tab_name):
spark.catalog.setCurrentCatalog(cat_name)
dlh.drop_table(tab_name, db_name, rm_files=True)
def transform_table(params):
catalog_name = params["catalog_name"]
db_name = params["db_name"]
table_name = params["table_name"]
target_table = params["target_table"]
log_db = params["log_db"]
print(
f"""
catalog_name: {catalog_name}
Stage database name: {db_name}
Stage table name: {table_name}
Target table name: {target_table}
Publishing log db name: {log_db}
Publishing log table name: ddu_batch_log
Transformation: transform_{target_table}
"""
)
print(
f"--> 1. Running notebook ./custom_transformations/transform_{target_table} to transform {target_table} table"
)
df,key_columns,mode = globals()[f"transform_{target_table}"]()
# df.display()
print(
f"--> 2. Writing to delta staging table: {catalog_name}.{db_name}.{table_name}"
)
drop_delta_table(catalog_name, log_db, table_name)
save_as_delta_table(df, catalog_name, db_name, table_name)
return key_columns,mode
def publish_to_dynamics(params):
catalog_name = params["catalog_name"]
db_name = params["db_name"]
table_name = params["table_name"]
target_table = params["target_table"]
key_columns = params["key_columns"]
mode = params["mode"]
log_db = params["log_db"]
print(
f"""
catalog_name: {catalog_name}
Stage database name: {db_name}
Stage table name: {table_name}
Target table name: {target_table}
Key columns: {key_columns}
Mode: {mode}
Publishing log db name: {log_db}
Publishing bacth log table name: {log_db}.ddu_batch_log
Quarantine table name: {log_db}.{target_table}
Transformation: transform_{target_table}
"""
)
df = read_from_delta_table(catalog_name, db_name, table_name)
print(f"--> 3. Writing to dynamics table: {target_table}")
write_to_dynamics_table(df, target_table, key_columns, table_name, mode)
ddu_db_name, ddu_log_table_name, target_table = (
log_db,
"ddu_batch_log",
target_table,
)
print(
f"--> 4. Printing batch log for {target_table}: {ddu_db_name}.{ddu_log_table_name}"
)
df = get_dynamics_batch_log(
catalog_name, ddu_db_name, ddu_log_table_name, target_table
)
df.display()
ddu_db_name, quarantine_tab_name = log_db, target_table
print(
f"--> 5. Printing quarantine logs for {target_table}: {ddu_db_name}.{quarantine_tab_name}"
)
df = get_dynamics_quarantine_log(catalog_name, ddu_db_name, quarantine_tab_name)
df.display()
def transform_and_publish(params):
key_columns, mode = transform_table(params)
params["key_columns"] = key_columns
params["mode"] = mode
publish_to_dynamics(params)
# COMMAND ----------
catalog_name = dbutils.widgets.get("catalog_name")
pTableName = dbutils.widgets.get("source_table")
# COMMAND ----------
spark.catalog.setCurrentCatalog(catalog_name)
db_name, table_name = pTableName.split(".")
dlh = DeltaLakeHandler()
dynamics_client = get_dynamics_client(secret_name, num_of_dynamics_clients)
# COMMAND ----------
target_table = table_name
params = {"catalog_name": catalog_name, "db_name": db_name, "table_name": table_name, "target_table": target_table, "log_db": ddu_log_db}
transform_and_publish(params)
Updated 5 months ago