shared_utils.py

# Databricks notebook source
import json
import time
import re
from pathlib import Path
from pyspark.sql import DataFrame
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from empowerspark.core.storage.datalake import DataLakeManager
from empowerspark.core.storage.delta_lake_handler import DeltaLakeHandler
from empowerspark.target.type.dynamics.data_client import DataClient
from empowerspark.target.type.dynamics import Dynamics

# COMMAND ----------


def get_secret(secret_name) -> dict:
    secret = dbutils.secrets.get("primary-key-vault-scope", secret_name)
    return json.loads(secret)


def get_rtk(erp) -> str:
    return dbutils.secrets.get("primary-key-vault-scope", f"jdbc-{erp}-rtk")


def get_dynamics_client(secret_name, num_of_clients) -> Dynamics:
    secret = get_secret(secret_name)
    erp = secret["dynamics_erp"]
    rtk = get_rtk(erp)

    print(
        f"""Creating Dynamics client for -> {erp} 
          url: {secret['dynamics_url']}
          azure_tenant: {secret['azure_tenant']}
          {secret_name} client_id: {secret['client_id']}"""
    )
    data_clients = []
    data_client = Dynamics.create_client(
        erp,
        secret["dynamics_url"],
        secret["azure_tenant"],
        secret["client_id"],
        secret["client_secret"],
        rtk,
    )
    data_clients.append(data_client)

    # loop through additional secrents and create clients
    for i in range(int(num_of_dynamics_clients)):
        format_secret_name = additional_secret_names.split("$$")
        secret_name = f"{format_secret_name[0]}{i}{format_secret_name[1]}"
        try:
            secret_value = dbutils.secrets.get("primary-key-vault-scope", secret_name)
            match = re.search(
                r"OAuthClientID=(.*?);OAuthClientSecret=(.*?);", secret_value
            )
            if match:
                client_id, client_secret = match.groups()
                # print(f"{i} {secret_name} client_id: {client_id}")
            else:
                print(f"Error-1 at {secret_name}: {e}")

            data_client = Dynamics.create_client(
                erp,
                secret["dynamics_url"],
                secret["azure_tenant"],
                client_id,
                client_secret,
                rtk,
            )
            data_clients.append(data_client)
        except Exception as e:
            print(f"Error-2 at {secret_name}: {e}")

    Dynamics.ARTIFACT_DB = ddu_log_db
    print(f"Created {len(data_clients)} Dynamics client(s). DDU log db: {ddu_log_db}")
    return Dynamics(data_clients)


def read_from_dynamics_table(tab_name) -> DataFrame:
    return dynamics_client.clients[0].read_table(tab_name)


def write_to_dynamics_table(df, target_table, key_columns, source_table, mode):
    dynamics_client.load_dataframe(df, target_table, key_columns, source_table, mode)


def truncate_dynamics_table(tab_name):
    dynamics_client.clients[0].truncate_table(tab_name)


def get_dynamics_quarantine_log(cat_name, db_name, tab_name) -> DataFrame:
    spark.catalog.setCurrentCatalog(cat_name)
    query = f"""
        WITH  latest_batch as (
        SELECT run_id, MAX(end_time) as max_run_date
        FROM {ddu_log_db}.ddu_batch_log
        GROUP BY run_id
        ORDER BY max_run_date DESC
        LIMIT 1 
        )
        , log_table as (select * from {ddu_log_db}.{tab_name})

        --SELECT a.* 
        SELECT  a.__ddu_error, a.__ddu_run_id, a.__ddu_mode, count(*)
        FROM log_table a 
        INNER JOIN latest_batch b 
        on b.run_id = a.__ddu_run_id
        group by a.__ddu_error, a.__ddu_run_id, a.__ddu_mode
    """
    print(query)
    df = spark.sql(query)
    return df


def get_dynamics_batch_log(cat_name, db_name, tab_name, target_table) -> DataFrame:
    # df = spark.sql(f"select * from {cat_name}.{db_name}.{tab_name}").orderBy(col("log_id").desc())
    query = f"""select * from {cat_name}.{ddu_log_db}.{tab_name} where table_name = '{target_table}' order by log_id desc """
    print(query)
    df = spark.sql(query)
    return df


def get_dynamics_quarantine_log_for_run_id(
    cat_name, db_name, tab_name, run_id
) -> DataFrame:
    query = f"""select * from {cat_name}.{db_name}.{tab_name} where __ddu_run_id = '{run_id}' """
    print(query)
    df = spark.sql(query)
    return df


def read_from_delta_table(cat_name, db_name, tab_name) -> DataFrame:
    return spark.table(f"{cat_name}.{db_name}.{tab_name}")


def save_as_delta_table(df, cat_name, db_name, tab_name):
    drop_delta_table(cat_name, db_name, tab_name)
    table_path = dlh.build_delta_file_path(db_name, tab_name)
    dlh.write_delta_table(df, tab_name, db_name, mode="overwrite", mergeschema=True)


def drop_delta_table(cat_name, db_name, tab_name):
    spark.catalog.setCurrentCatalog(cat_name)
    dlh.drop_table(tab_name, db_name, rm_files=True)


def transform_table(params):
    catalog_name = params["catalog_name"]
    db_name = params["db_name"]
    table_name = params["table_name"]
    target_table = params["target_table"]
    log_db = params["log_db"]
    print(
        f"""
        catalog_name: {catalog_name}
        Stage database name: {db_name}
        Stage table name: {table_name}
        Target table name: {target_table}
        Publishing log db name: {log_db}
        Publishing log table name: ddu_batch_log
        Transformation: transform_{target_table}
        """
    )

    print(
        f"--> 1. Running notebook ./custom_transformations/transform_{target_table} to transform {target_table} table"
    )
    df,key_columns,mode = globals()[f"transform_{target_table}"]()
    # df.display()

    print(
        f"--> 2. Writing to delta staging table: {catalog_name}.{db_name}.{table_name}"
    )
    drop_delta_table(catalog_name, log_db, table_name)
    save_as_delta_table(df, catalog_name, db_name, table_name)
    return key_columns,mode

def publish_to_dynamics(params):
    catalog_name = params["catalog_name"]
    db_name = params["db_name"]
    table_name = params["table_name"]
    target_table = params["target_table"]
    key_columns = params["key_columns"]
    mode = params["mode"]
    log_db = params["log_db"]
    print(
        f"""
        catalog_name: {catalog_name}
        Stage database name: {db_name}
        Stage table name: {table_name}
        Target table name: {target_table}
        Key columns: {key_columns}
        Mode: {mode}
        Publishing log db name: {log_db}
        Publishing bacth log table name: {log_db}.ddu_batch_log
        Quarantine table name: {log_db}.{target_table}
        Transformation: transform_{target_table}
        """
    )

    df = read_from_delta_table(catalog_name, db_name, table_name)

    print(f"--> 3. Writing to dynamics table: {target_table}")
    write_to_dynamics_table(df, target_table, key_columns, table_name, mode)

    ddu_db_name, ddu_log_table_name, target_table = (
        log_db,
        "ddu_batch_log",
        target_table,
    )
    print(
        f"--> 4. Printing batch log for {target_table}: {ddu_db_name}.{ddu_log_table_name}"
    )
    df = get_dynamics_batch_log(
        catalog_name, ddu_db_name, ddu_log_table_name, target_table
    )
    df.display()

    ddu_db_name, quarantine_tab_name = log_db, target_table
    print(
        f"--> 5. Printing quarantine logs for {target_table}: {ddu_db_name}.{quarantine_tab_name}"
    )
    df = get_dynamics_quarantine_log(catalog_name, ddu_db_name, quarantine_tab_name)
    df.display()


def transform_and_publish(params):
    key_columns, mode = transform_table(params)
    params["key_columns"] = key_columns
    params["mode"] = mode
    publish_to_dynamics(params)

# COMMAND ----------

catalog_name = dbutils.widgets.get("catalog_name")
pTableName = dbutils.widgets.get("source_table")

# COMMAND ----------

spark.catalog.setCurrentCatalog(catalog_name)
db_name, table_name = pTableName.split(".")
dlh = DeltaLakeHandler()
dynamics_client = get_dynamics_client(secret_name, num_of_dynamics_clients)

# COMMAND ----------

target_table = table_name
params = {"catalog_name": catalog_name, "db_name": db_name, "table_name": table_name, "target_table": target_table, "log_db": ddu_log_db}
transform_and_publish(params)