Troubleshooting Guide

Analyzing Batch Metrics

Basic Metrics Query

SELECT 
    run_id,
    source_table,
    sum(partition_size) as total_processed,
    sum(success_inserts) as success_inserts,
    sum(failed_inserts) as failed_inserts,
    sum(success_updates) as success_updates,
    sum(failed_updates) as failed_updates,
    sum(invalid_records) as invalid_records,
    sum(skipped) as skipped,
    sum(skips_missed) as skips_missed,
    min(start_time) as start_time,
    max(error) as last_error
FROM emp_dv_001.empower_ddu.batch_log_v2
GROUP BY run_id, source_table, table_name
ORDER BY min(start_time) DESC

Processing Time Analysis

SELECT 
    run_id,
    cast(sum(pre_processing_time)/60 as decimal(18,5)) as pre_proc_mins,
    cast(sum(processing_time)/60 as decimal(18,5)) as proc_mins,
    cast(sum(post_processing_time)/60 as decimal(18,5)) as post_proc_mins,
    cast(sum(total_processing_time)/60 as decimal(18,5)) as total_mins,
    cast(sum(deferred_processing_time)/60 as decimal(18,5)) as deferred_proc_mins,
    source_table,
    min(start_time)
FROM emp_dv_001.empower_ddu.batch_log_v2
GROUP BY run_id, source_table, table_name
ORDER BY min(start_time) DESC

Common Metric Anomalies

  1. High Skips Missed Rate

    • Indicates pre-processing filtering isn't catching all unchanged records
    • Check query logs for repeat updates
    • Consider data normalization rules
    config = {
        "advanced.data_transformation.case_insensitive_columns": ["firstname", "lastname", "address1_line1"],
        "advanced.columns.compare_case_insensitive": True
    }
    
  2. Large Number of Updates

    • Review query logs for patterns
    SELECT target_values_changed, count(*) 
    FROM emp_dv_001.empower_ddu.contact_query_log 
    WHERE run_id = '<run_id>' AND mode = 'update'
    GROUP BY target_values_changed
    ORDER BY count(*) DESC
    
  3. High Invalid Records

    • Check error patterns
    SELECT `__ddu_error`, count(*) 
    FROM emp_dv_001.empower_ddu.contact 
    WHERE `__ddu_run_id` = '<run_id>'
    GROUP BY `__ddu_error`
    

Using Query Logs for Troubleshooting

  1. Identify Repeated Updates
SELECT 
    key_values,
    target_values_changed,
    count(*) as update_count
FROM emp_dv_001.empower_ddu.contact_query_log
WHERE run_id = '<run_id>' 
AND mode = 'update'
GROUP BY key_values, target_values_changed
HAVING count(*) > 1
ORDER BY count(*) DESC
  1. Analyze Changed Values
SELECT DISTINCT target_values_changed 
FROM emp_dv_001.empower_ddu.contact_query_log 
WHERE run_id = '<run_id>'
AND mode = 'update'

Data Normalization Solutions

  1. String Sanitization Rules
rules = [
    # Replace "Inactive Former Member" with "Deceased Member"
    create_text_replacement_rule(
        old_text="Inactive Former Member",
        new_text="Deceased Member",
        columns=["customertypecode"],
        apply_to_source=True
    ),
    
    # Remove trailing slashes
    create_rule(
        pattern=r'^(.*?)(?<!\\)/+$',
        replacement='$1',
        columns=["new_otheroccupation", "new_otheroccupation2"],
        apply_to_source=True,
        apply_to_target=True
    ),
    
    # Remove apostrophes
    create_rule(
        pattern=r"'",
        replacement="",
        columns=["new_otheroccupation", "emailaddress1"],
        apply_to_source=True
    )
]

config = {
    "advanced.data_transformation.sanitization_rules": [rule.model_dump() for rule in rules]
}
  1. Column Exclusions
config = {
    "advanced.columns.exclude_columns": [
        "owningteam_id", 
        "owningteam_logicalname",
        "new_age"  # Calculated field
    ]
}

Using Sampling for Testing

  1. Basic Sampling Configuration
sampling_config = {
    "advanced.debug.sampling_enabled": True,
    "advanced.debug.sample_size": 50_000,
}
  1. Targeted Sampling Using Batch Mapping
-- Get problematic record IDs
WITH problem_records AS (
    SELECT key_values
    FROM emp_dv_001.empower_ddu.contact_query_log
    WHERE run_id = '<problem_run_id>'
    AND status = 'failed'
)
SELECT id 
FROM emp_dv_001.empower_ddu.contact_batch_mapping
WHERE run_id = '<current_run_id>' 
AND id IN (SELECT id FROM problem_records)

Then use these IDs for sampling:

condition = "id in ('" + "','".join(key_list) + "')"
config = {
    "advanced.debug.sampling_enabled": True,
    "advanced.debug.sample_condition": condition,
}

Performance Optimization

When to Use Batching

-- Compare processing times with/without batching
WITH batch_stats AS (
    SELECT 
        CASE 
            WHEN partition_size = source_count THEN 'no_batching'
            ELSE 'batching'
        END as process_type,
        AVG(total_processing_time/60) as avg_mins,
        SUM(partition_size) as total_records
    FROM emp_dv_001.empower_ddu.batch_log_v2
    WHERE source_table = 'contact'
    GROUP BY 
        CASE 
            WHEN partition_size = source_count THEN 'no_batching'
            ELSE 'batching'
        END
)
SELECT * FROM batch_stats

Guidelines:

  • Enable batching when total records > 50,000
  • Disable if total load time with batching > 1.2 * non-batched time
  • Adjust batch size based on average total processing time per batch

Pushdown Filter Decision

-- Compare target load times
SELECT 
    run_id,
    SUM(target_load_time) as total_load_time,
    SUM(partition_size) as total_records
FROM emp_dv_001.empower_ddu.batch_log_v2
WHERE source_table = 'contact'
GROUP BY run_id
ORDER BY run_id DESC

Guidelines:

  • Disable pushdown if composite business keys are used
  • Enable when target table size >> source updates
  • Consider disabling if target load time > 30% of total processing time

Common Issues and Solutions

  1. High Rate Limiting
config = {
    # Increase worker count based on SP count
    "advanced.performance.num_workers": num_sps * 50,
    # Enable deferred retry
    "advanced.performance.deferred_retry_enabled": True,
    "advanced.performance.deferred_retry_delay": 300
}
  1. Memory Issues
config = {
    # Reduce batch size
    "advanced.performance.batch_size": 25_000,
    # Enable ID-based strategy if possible
    "advanced.performance.id_based_strategy_enabled": True
}
  1. Slow Processing
config = {
    # Increase worker count
    "advanced.performance.num_workers": 1600,
    # Optimize connection pooling
    "advanced.connection.use_multi_pool": True,
    "advanced.connection.max_connections_per_sp": 50
}
  1. Data Quality Issues
config = {
    # Normalize data handling
    "advanced.columns.legacy_compare_values": False,
    "advanced.columns.compare_case_insensitive": True,
    # Enable extended state codes
    "advanced.columns.use_extended_state_codes": True
}

Monitoring Dashboard Queries

Processing Time Trends

SELECT 
    DATE_TRUNC('hour', start_time) as hour,
    AVG(total_processing_time/60) as avg_mins,
    COUNT(DISTINCT run_id) as num_runs,
    SUM(partition_size) as total_records
FROM emp_dv_001.empower_ddu.batch_log_v2
WHERE source_table = 'contact'
GROUP BY DATE_TRUNC('hour', start_time)
ORDER BY hour DESC

Error Rate Analysis

SELECT 
    run_id,
    (sum(failed_inserts + failed_updates)::float / 
     NULLIF(sum(partition_size), 0) * 100)::decimal(5,2) as error_rate,
    sum(partition_size) as total_records,
    min(start_time) as run_start
FROM emp_dv_001.empower_ddu.batch_log_v2
WHERE source_table = 'contact'
GROUP BY run_id
ORDER BY run_start DESC

Operation Distribution

SELECT 
    run_id,
    sum(success_inserts) as inserts,
    sum(success_updates) as updates,
    sum(skipped) as skipped,
    sum(invalid_records) as invalid,
    min(start_time) as run_start
FROM emp_dv_001.empower_ddu.batch_log_v2
WHERE source_table = 'contact'
GROUP BY run_id
ORDER BY run_start DESC