DDU Tooling

DDU Tooling

This section describes the tools available in the DDU library to create DDU applications. The details here can be used to build custom DDU applications outside of the Empower Publishing framework.

Data Clients

The data client class provides an interface for interacting directly with the Dynamics 365 environment. The class is initialized with either the needed credentials or a complete JDBC connection string.

from empowerspark.target.type.dynamics.data_client import DataClient

erp = ...  # Either "dynamicscrm" or "dynamics365"
dynamics_url = ...  # The URL of the dynamics environment
tenant_id = ...  # The Azure tenant ID
client_id = ...  # The Azure AD application ID
client_secret = ...  # The Azure AD application secret
rtk = ...  # The connector run time key

# Initialize a data client using credentials
client = DataClient.from_parameters_with_erp(erp, dynamics_url, tenant_id, client_id, client_secret, rtk)

# Initialize a CRM or FNO client directly
from empowerspark.target.type.dynamics.data_client import CRMClient
client = CRMClient.from_parameters(dynamics_url, tenant_id, client_id, client_secret, rtk)

from empowerspark.target.type.dynamics.data_client import FNOClient
client = FNOClient.from_parameters(dynamics_url, tenant_id, client_id, client_secret, rtk)

# Advanced - Intialize using a custom JDBC connection string
jdbc_connection_string = "jdbc:dynamicscrm:;..."
client = CRMClient(jdbc_connection_string)

Once the client is created, it can be used to execute CRUD operations with the Dynamics environment.

# Get a list of available tables - Note these are spark dataframes.
client.read_table("sys_tables")

# Read a particular table
client.read_table(table_name)

# Execute a SQL query directly
client.query("SELECT column1, column2 FROM table WHERE column3 = 'value'")

# Append a dataframe to a table
client.append_table(df, table_name)

# Insert a new record
new_record = {"column1": "value1", "column2": "value2"}
client.insert_record(new_record, table_name)

# Update a record
record_update = {"column1": "value3", "column2": "value4"}
key_columns = ["id"]
client.update_record(update_record, table_name, key_columns)

# Delete records
client.delete_from_table(table_name, "id", [id1, id2, id3])

Dynamics

The Dynamics class provides a high-level interface for executing data migrations or integrations with a dynamics environment. The tool manages a pool of threads that can be used to load data to the dynamics environment in parallel. Data from a source, see options below, are compared to the target table to identify which incoming records are inserts, updates, unchanged, or invalid. Each record is then pushed individually to the target table such that each record can fail independently from any others.

As before, there are multiple ways to initialize the class depending on the use case.

from empowerspark.target.type.dynamics import Dynamics

# Initialize with a DataClient
migrator = Dynamics(client)

# If multiple clients are being used, they can be passed into the constructor
# or appended after initialization.
migrator = Dynamics([client1, client2])
migrator.add_client(client3)
migrator.add_client(client4)

# Initialize directly from a key vault secret
secret = keyvault.get_secret(secret_name)
migrator = Dynamics()
migrator.configure_secrets(secret)

The migration tool provides three different methods for loading data to the dynamics environment.

  • load
  • load_dataframe
  • auto_load

These methods all have common parameters, specific details are outlined below.

  • key_columns - The list of columns to use as the primary key for joins. This column is required when the mode is set to "upsert" and is ignored for other modes. The following are all valid formats to specify the key columns:

    key_columns = "id"
    key_columns = ["id"]
    key_columns = ["id1", "id2"]
    key_columns = "id1,id2"
    
  • target_table - The name of the table in the dynamics environment.

  • source_table - The name of the source table in the metastore. This is also used for error tracking and logging.

  • mode - The write mode for the operation. Currently supported modes are:

    1. upsert - The default mode. Records are upsert and merged, independently, to the target.
    2. insert - All records are inserted, independently, to the target table.
    3. append - The source data is appended, in a single transaction, to the target table. In this mode, all records will either succeed or fail, even if only one record is invalid.

Load

The load method loads a delta table from the metastore directly to the dynamics environment. This is the most common way to stage the data as it provides easy tracking in the delta table's history and lineage as well as re-usability for multiple operations. The method takes the following parameters:

ParameterRequiredDescription
source_tablexThe name of the source table in the metastore.
key_columnsThe primary key columns for the entity. Required when "mode" is set to "upsert".
target_tableThe name of the table in the dynamics environment. When not provided, the target table will be the same as the source table name.
modeThe write mode for the operation. The default mode is "upsert" if not provided.

Example:

migrator.load("dynamics_database.account", "account_id")

Load Dataframe

The load_dataframe method loads a dataframe directly to the dynamics environment. This is useful when minor transformations might need to be applied to a data prior to loading it into the target. The method takes the following parameters:

ParameterRequiredDescription
dfxThe source dataframe to load.
target_tablexThe name of the table in the dynamics environment.
key_columnsThe primary key columns for the entity. Required when "mode" is set to "upsert".
source_nameused to set the name of the source for logging. When not specified thie defaults to "target_table".
modeThe write mode for the operation. The default mode is "upsert" if not provided.

Example:

migrator.load_dataframe(df, "account_id", "account", "source_account")

Auto Load

The auto_load method uses Spark structured streaming to incrementally load data from the metastore to the dynamics environment. This is useful when the source dataset is large as the source will be pushed up in batches. It can also be used to load data from a source that is being incrementally appended. The method takes the following parameters:

ParameterRequiredDescription
checkpoint_locationxThe location to store the streaming checkpoint data.
source_tablexThe name of the source table in the metastore.
key_columnsThe primary key columns for the entity. Required when "mode" is set to "upsert".
target_tableThe name of the table in the dynamics environment. If not specified, default to the source table name.
modeThe write mode for the operation. The default mode is "upsert" if not provided.

Example:

migrator.auto_load("dynamics_database.account", "account_id", "dbfs:/empower/ddu_artifacts/account/checkpoints")

The user is expected to manage the checkpoint location as needed, for example deleting it when the table has been over-written with a new set of data.