Troubleshooting Guide
Analyzing Batch Metrics
Basic Metrics Query
SELECT
run_id,
source_table,
sum(partition_size) as total_processed,
sum(success_inserts) as success_inserts,
sum(failed_inserts) as failed_inserts,
sum(success_updates) as success_updates,
sum(failed_updates) as failed_updates,
sum(invalid_records) as invalid_records,
sum(skipped) as skipped,
sum(skips_missed) as skips_missed,
min(start_time) as start_time,
max(error) as last_error
FROM emp_dv_001.empower_ddu.batch_log_v2
GROUP BY run_id, source_table, table_name
ORDER BY min(start_time) DESC
Processing Time Analysis
SELECT
run_id,
cast(sum(pre_processing_time)/60 as decimal(18,5)) as pre_proc_mins,
cast(sum(processing_time)/60 as decimal(18,5)) as proc_mins,
cast(sum(post_processing_time)/60 as decimal(18,5)) as post_proc_mins,
cast(sum(total_processing_time)/60 as decimal(18,5)) as total_mins,
cast(sum(deferred_processing_time)/60 as decimal(18,5)) as deferred_proc_mins,
source_table,
min(start_time)
FROM emp_dv_001.empower_ddu.batch_log_v2
GROUP BY run_id, source_table, table_name
ORDER BY min(start_time) DESC
Common Metric Anomalies
-
High Skips Missed Rate
- Indicates pre-processing filtering isn't catching all unchanged records
- Check query logs for repeat updates
- Consider data normalization rules
config = { "advanced.data_transformation.case_insensitive_columns": ["firstname", "lastname", "address1_line1"], "advanced.columns.compare_case_insensitive": True }
-
Large Number of Updates
- Review query logs for patterns
SELECT target_values_changed, count(*) FROM emp_dv_001.empower_ddu.contact_query_log WHERE run_id = '<run_id>' AND mode = 'update' GROUP BY target_values_changed ORDER BY count(*) DESC
-
High Invalid Records
- Check error patterns
SELECT `__ddu_error`, count(*) FROM emp_dv_001.empower_ddu.contact WHERE `__ddu_run_id` = '<run_id>' GROUP BY `__ddu_error`
Using Query Logs for Troubleshooting
- Identify Repeated Updates
SELECT
key_values,
target_values_changed,
count(*) as update_count
FROM emp_dv_001.empower_ddu.contact_query_log
WHERE run_id = '<run_id>'
AND mode = 'update'
GROUP BY key_values, target_values_changed
HAVING count(*) > 1
ORDER BY count(*) DESC
- Analyze Changed Values
SELECT DISTINCT target_values_changed
FROM emp_dv_001.empower_ddu.contact_query_log
WHERE run_id = '<run_id>'
AND mode = 'update'
Data Normalization Solutions
- String Sanitization Rules
rules = [
# Replace "Inactive Former Member" with "Deceased Member"
create_text_replacement_rule(
old_text="Inactive Former Member",
new_text="Deceased Member",
columns=["customertypecode"],
apply_to_source=True
),
# Remove trailing slashes
create_rule(
pattern=r'^(.*?)(?<!\\)/+$',
replacement='$1',
columns=["new_otheroccupation", "new_otheroccupation2"],
apply_to_source=True,
apply_to_target=True
),
# Remove apostrophes
create_rule(
pattern=r"'",
replacement="",
columns=["new_otheroccupation", "emailaddress1"],
apply_to_source=True
)
]
config = {
"advanced.data_transformation.sanitization_rules": [rule.model_dump() for rule in rules]
}
- Column Exclusions
config = {
"advanced.columns.exclude_columns": [
"owningteam_id",
"owningteam_logicalname",
"new_age" # Calculated field
]
}
Using Sampling for Testing
- Basic Sampling Configuration
sampling_config = {
"advanced.debug.sampling_enabled": True,
"advanced.debug.sample_size": 50_000,
}
- Targeted Sampling Using Batch Mapping
-- Get problematic record IDs
WITH problem_records AS (
SELECT key_values
FROM emp_dv_001.empower_ddu.contact_query_log
WHERE run_id = '<problem_run_id>'
AND status = 'failed'
)
SELECT id
FROM emp_dv_001.empower_ddu.contact_batch_mapping
WHERE run_id = '<current_run_id>'
AND id IN (SELECT id FROM problem_records)
Then use these IDs for sampling:
condition = "id in ('" + "','".join(key_list) + "')"
config = {
"advanced.debug.sampling_enabled": True,
"advanced.debug.sample_condition": condition,
}
Performance Optimization
When to Use Batching
-- Compare processing times with/without batching
WITH batch_stats AS (
SELECT
CASE
WHEN partition_size = source_count THEN 'no_batching'
ELSE 'batching'
END as process_type,
AVG(total_processing_time/60) as avg_mins,
SUM(partition_size) as total_records
FROM emp_dv_001.empower_ddu.batch_log_v2
WHERE source_table = 'contact'
GROUP BY
CASE
WHEN partition_size = source_count THEN 'no_batching'
ELSE 'batching'
END
)
SELECT * FROM batch_stats
Guidelines:
- Enable batching when total records > 50,000
- Disable if total load time with batching > 1.2 * non-batched time
- Adjust batch size based on average total processing time per batch
Pushdown Filter Decision
-- Compare target load times
SELECT
run_id,
SUM(target_load_time) as total_load_time,
SUM(partition_size) as total_records
FROM emp_dv_001.empower_ddu.batch_log_v2
WHERE source_table = 'contact'
GROUP BY run_id
ORDER BY run_id DESC
Guidelines:
- Disable pushdown if composite business keys are used
- Enable when target table size >> source updates
- Consider disabling if target load time > 30% of total processing time
Common Issues and Solutions
- High Rate Limiting
config = {
# Increase worker count based on SP count
"advanced.performance.num_workers": num_sps * 50,
# Enable deferred retry
"advanced.performance.deferred_retry_enabled": True,
"advanced.performance.deferred_retry_delay": 300
}
- Memory Issues
config = {
# Reduce batch size
"advanced.performance.batch_size": 25_000,
# Enable ID-based strategy if possible
"advanced.performance.id_based_strategy_enabled": True
}
- Slow Processing
config = {
# Increase worker count
"advanced.performance.num_workers": 1600,
# Optimize connection pooling
"advanced.connection.use_multi_pool": True,
"advanced.connection.max_connections_per_sp": 50
}
- Data Quality Issues
config = {
# Normalize data handling
"advanced.columns.legacy_compare_values": False,
"advanced.columns.compare_case_insensitive": True,
# Enable extended state codes
"advanced.columns.use_extended_state_codes": True
}
Monitoring Dashboard Queries
Processing Time Trends
SELECT
DATE_TRUNC('hour', start_time) as hour,
AVG(total_processing_time/60) as avg_mins,
COUNT(DISTINCT run_id) as num_runs,
SUM(partition_size) as total_records
FROM emp_dv_001.empower_ddu.batch_log_v2
WHERE source_table = 'contact'
GROUP BY DATE_TRUNC('hour', start_time)
ORDER BY hour DESC
Error Rate Analysis
SELECT
run_id,
(sum(failed_inserts + failed_updates)::float /
NULLIF(sum(partition_size), 0) * 100)::decimal(5,2) as error_rate,
sum(partition_size) as total_records,
min(start_time) as run_start
FROM emp_dv_001.empower_ddu.batch_log_v2
WHERE source_table = 'contact'
GROUP BY run_id
ORDER BY run_start DESC
Operation Distribution
SELECT
run_id,
sum(success_inserts) as inserts,
sum(success_updates) as updates,
sum(skipped) as skipped,
sum(invalid_records) as invalid,
min(start_time) as run_start
FROM emp_dv_001.empower_ddu.batch_log_v2
WHERE source_table = 'contact'
GROUP BY run_id
ORDER BY run_start DESC
Updated 25 days ago