Skip to content

Configuration

PipeGen uses YAML configuration files to define pipeline behavior, connections, and processing logic.

Configuration File Structure

yaml
# config.yaml
pipeline:
  name: "my-streaming-pipeline"
  version: "1.0.0"
  description: "Process user events in real-time"

kafka:
  brokers:
    - "localhost:9093"
    - "kafka-2:9092"
  topics:
    input: "user-events"
    output: "processed-events"
  consumer_group: "pipegen-processor"
  
flink:
  jobmanager_url: "http://localhost:8081"
  parallelism: 4
  checkpoint:
    interval: "30s"
    timeout: "10m"
    mode: "exactly_once"
  memory:
    jobmanager: "1gb"
    taskmanager: "2gb"

schemas:
  input: "schemas/user_event.avsc"
  output: "schemas/processed_event.avsc"
  
processing:
  sql_files:
    - "sql/01_create_source_table.sql"
    - "sql/02_create_processing.sql"
    - "sql/03_create_output_table.sql"
    - "sql/04_insert_results.sql"

# Connector Management
connectors:
  directory: "connectors/"
  auto_populate: true
  restart_required: true  # Restart containers after adding custom JARs

monitoring:
  metrics_enabled: true
  log_level: "info"

Configuration Sections

Pipeline Configuration

yaml
pipeline:
  name: "my-pipeline"           # Pipeline identifier
  version: "1.0.0"              # Version for tracking
  description: "Pipeline desc"  # Human-readable description
  tags:                         # Optional tags for organization
    - "production"
    - "user-analytics"

Kafka Configuration

yaml
kafka:
  brokers:                      # List of Kafka brokers
    - "broker1:9092"
    - "broker2:9092"
  topics:
    input: "input-topic"        # Input topic name
    output: "output-topic"      # Output topic name
    error: "error-topic"        # Error topic (optional)
  consumer_group: "pipegen"     # Consumer group ID
  
  # Advanced Kafka settings
  producer:
    batch_size: 16384
    linger_ms: 5
    compression_type: "gzip"
    acks: "all"
    retries: 2147483647
    
  consumer:
    auto_offset_reset: "earliest"
    enable_auto_commit: false
    session_timeout_ms: 30000
    max_poll_records: 500
yaml
flink:
  jobmanager_url: "http://localhost:8081"
  
  # Parallelism and scaling
  parallelism: 4                # Default parallelism
  max_parallelism: 128          # Maximum parallelism
  
  # Checkpointing
  checkpoint:
    interval: "30s"             # Checkpoint interval
    timeout: "10m"              # Checkpoint timeout
    min_pause: "5s"             # Minimum pause between checkpoints
    max_concurrent: 1           # Max concurrent checkpoints
    mode: "exactly_once"        # Processing guarantee
    
  # Resource allocation
  memory:
    jobmanager: "1gb"
    taskmanager: "2gb"
    network: "128mb"
    
  # Recovery
  restart_strategy:
    type: "failure_rate"
    failure_rate: 3
    failure_interval: "5m"
    delay: "10s"

Schema Configuration

yaml
schemas:
  input: "schemas/input.avsc"                  # Input schema file (canonical)
  output: "schemas/output_result.avsc"         # Output schema file (AI path)
  format: "avro"                        # Schema format (avro, json)
  
  # Schema evolution settings
  compatibility: "backward"             # Compatibility mode
  auto_register: true                   # Auto-register schemas
  
  # Schema registry (if using Confluent Schema Registry)
  registry:
    url: "http://schema-registry:8081"
    auth:
      username: "user"
      password: "pass"

Processing Configuration

yaml
processing:
  sql_files:                            # SQL files in execution order
    - "sql/01_create_source.sql"
    - "sql/02_transform.sql"
    - "sql/03_create_sink.sql"
    - "sql/04_insert.sql"
    
  # Custom functions
  functions:
    - name: "CUSTOM_AGGREGATE"
      class: "com.example.CustomAggregate"
      jar: "libs/custom-functions.jar"

Runtime Configuration

Control pipeline execution behavior and timing:

yaml
runtime:
  # Producer settings
  message_rate: 100                     # Messages per second
  duration: "30s"                       # Producer execution duration

  # Pipeline timing
  pipeline_timeout: "5m"                # Overall pipeline timeout
  expected_messages: 0                  # Expected consumer messages (0 = auto-calculate)

  # Execution control
  cleanup: true                         # Clean up resources after execution
  generate_report: true                 # Generate HTML execution reports
  reports_dir: "./reports"              # Reports output directory

  # Traffic patterns (for load testing)
  traffic_patterns:
    - time_range: "30s-60s"
      rate_multiplier: 3.0              # 300% of baseline rate
    - time_range: "90s-120s"
      rate_multiplier: 2.0              # 200% of baseline rate

Smart Consumer Stopping

PipeGen automatically calculates expected message count and stops the consumer when complete:

  • Auto-calculation: Uses producer statistics to determine expected messages
  • Manual override: Set expected_messages for precise control
  • Progress tracking: Real-time updates show completion percentage
  • Smart timeout: Stops after 30s if no messages received

Pipeline Timing

Two separate timeout controls provide flexibility:

  • Producer Duration (duration): How long the producer runs
  • Pipeline Timeout (pipeline_timeout): Total time allowed for complete pipeline execution

This separation allows quick producer cycles while giving Flink adequate processing time.

Monitoring Configuration

yaml
monitoring:
  metrics_enabled: true                 # Enable metrics collection
  dashboard_port: 8080                  # Dashboard port
  log_level: "info"                     # Log level
  
  # Prometheus metrics
  prometheus:
    enabled: true
    port: 9090
    endpoint: "/metrics"
    
  # Health checks
  health_check:
    enabled: true
    port: 8081
    endpoint: "/health"
    interval: "30s"

Environment-Specific Configurations

Local Development

yaml
# config-local.yaml
kafka:
  brokers:
    - "localhost:9093"

flink:
  jobmanager_url: "http://localhost:8081"
  parallelism: 1
  memory:
    jobmanager: "512mb"
    taskmanager: "1gb"
    
monitoring:
  dashboard_port: 8080
  log_level: "debug"

Staging Environment

yaml
# config-staging.yaml
kafka:
  brokers:
    - "kafka-staging-1:9092"
    - "kafka-staging-2:9092"
    
flink:
  jobmanager_url: "http://flink-staging:8081"
  parallelism: 2
  memory:
    jobmanager: "1gb"
    taskmanager: "2gb"
    
monitoring:
  log_level: "warn"

Production Environment

yaml
# config-production.yaml
kafka:
  brokers:
    - "kafka-prod-1:9092"
    - "kafka-prod-2:9092"
    - "kafka-prod-3:9092"
    
flink:
  jobmanager_url: "http://flink-prod:8081"
  parallelism: 8
  max_parallelism: 128
  
  checkpoint:
    interval: "10s"
    mode: "exactly_once"
    
  memory:
    jobmanager: "2gb"
    taskmanager: "4gb"
    
  restart_strategy:
    type: "failure_rate"
    failure_rate: 5
    failure_interval: "10m"
    
monitoring:
  log_level: "error"
  prometheus:
    enabled: true

Configuration Validation

Use the validate command to check configuration:

bash
# Validate default configuration
pipegen validate

# Validate specific configuration
pipegen validate --config config-production.yaml

# Strict validation
pipegen validate --strict

Environment Variables

Override configuration with environment variables:

bash
export PIPEGEN_KAFKA_BROKERS="prod-kafka-1:9092,prod-kafka-2:9092"
export PIPEGEN_FLINK_PARALLELISM=8
export PIPEGEN_LOG_LEVEL=warn

pipegen run

Variable naming convention:

  • Prefix: PIPEGEN_
  • Nested keys: Use _ separator
  • Arrays: Comma-separated values

Configuration Templating

Use templates for dynamic configuration:

yaml
# config-template.yaml
kafka:
  brokers:
    - "{{.KAFKA_BROKER_1}}:9092"
    - "{{.KAFKA_BROKER_2}}:9092"
    
flink:
  parallelism: {{.FLINK_PARALLELISM | default 4}}
  memory:
    taskmanager: "{{.TASKMANAGER_MEMORY | default "2gb"}}"

Configuration Inheritance

Extend base configurations:

yaml
# base-config.yaml
kafka: &kafka
  consumer_group: "pipegen"
  topics:
    error: "pipeline-errors"
    
flink: &flink
  checkpoint:
    mode: "exactly_once"
    interval: "30s"

# production-config.yaml
kafka:
  <<: *kafka
  brokers:
    - "prod-kafka:9092"
    
flink:
  <<: *flink
  parallelism: 8

Security Configuration

SSL/TLS Configuration

yaml
kafka:
  security:
    protocol: "SSL"
    ssl:
      truststore_location: "/path/to/truststore.jks"
      truststore_password: "truststore-password"
      keystore_location: "/path/to/keystore.jks"
      keystore_password: "keystore-password"
      key_password: "key-password"

SASL Authentication

yaml
kafka:
  security:
    protocol: "SASL_SSL"
    sasl:
      mechanism: "PLAIN"
      username: "kafka-user"
      password: "kafka-password"

Best Practices

Configuration Organization

  • Use environment-specific files
  • Keep sensitive data in environment variables
  • Validate configurations in CI/CD
  • Version control configuration files

Connector Management

PipeGen automatically manages Flink connector JARs in the connectors/ directory:

Automatic Population:

  • Downloads required connectors during project initialization
  • Includes Kafka, AVRO, Schema Registry connectors
  • Version-aligned with Flink 1.18.x

Adding Custom Connectors:

bash
# Copy custom connector JARs to connectors directory
cp my-database-connector.jar ./connectors/
cp custom-format-connector.jar ./connectors/

# Restart containers to load new connectors
docker-compose restart flink-jobmanager flink-taskmanager sql-gateway

# Verify connectors are loaded
docker exec flink-jobmanager ls -la /opt/flink/lib/

Default Connectors Included:

  • Kafka Connector (flink-sql-connector-kafka-3.1.0-1.18.jar)
  • AVRO Schema Registry (flink-sql-avro-confluent-registry-1.18.1.jar)
  • Kafka Clients (kafka-clients-3.4.0.jar)
  • Jackson JSON Libraries (jackson-core, jackson-databind, jackson-annotations)
  • Supporting Libraries (Guava, CSV/JSON formatters)

Important Notes:

  • Container restart is required after adding custom JARs
  • Ensure connector versions are compatible with Flink 1.18.x
  • JARs are copied from ./connectors/ to /opt/flink/lib/ during container startup

Performance Tuning

  • Set appropriate parallelism for your workload
  • Configure memory based on data volume
  • Tune checkpoint intervals for your latency requirements
  • Monitor and adjust based on metrics

Security

  • Use SSL/TLS for production
  • Store passwords in secure vaults
  • Limit access to configuration files
  • Regular security audits

See Also

Released under the MIT License.