pipegen init
Create a new PipeGen project with SQL, AVRO schemas, config, and optional AI-powered generation.
Usage
bash
pipegen init <project-name> [flags]Flags
--forceOverwrite existing project directory--input-schemaPath to an AVRO schema (input.avsc) to seed the project--input-csvPath to a CSV file to infer schema & generate a filesystem source table--describeNatural language description for AI generation--domainBusiness domain for better AI context (e.g., ecommerce, fintech, iot)--helpShow help
Examples
bash
# Basic project
pipegen init my-pipeline
# Initialize from an existing AVSC (schema-driven)
pipegen init payments --input-schema ./schemas/input.avsc
# AI generation from description
export PIPEGEN_OLLAMA_MODEL=llama3.1 # or set PIPEGEN_OPENAI_API_KEY
pipegen init fraud-detection --describe "Real-time fraud detection for transactions" --domain fintech
# Ground AI with your AVSC (combine flags)
pipegen init analytics \
--input-schema ./schemas/input.avsc \
--describe "Hourly revenue per user with watermarking" \
--domain ecommerce
# Initialize from a CSV file (schema inferred, filesystem source created)
pipegen init web-events --input-csv ./data/web_events_sample.csv
# CSV + AI (AI grounded with inferred schema & profile of CSV columns)
pipegen init session-metrics \
--input-csv ./data/sessions.csv \
--describe "Session duration, bounce rate, and active user aggregation" \
--domain ecommerceBehavior
Without
--describe(standard generation)- Uses your
--input-schemaif provided, otherwise generates a default AVRO schema - Synthesizes a baseline Flink source DDL at
sql/01_create_source_table.sql(Kafka + avro-confluent)
- Uses your
With
--describe(AI generation)- If AI is configured, PipeGen generates schemas, SQL, and docs; when
--input-schemais provided, the AI is grounded on your schema andschemas/input.avscis kept canonical - If AI is not configured, PipeGen falls back automatically:
- With
--input-schema: schema-driven generation with baseline DDL - Without
--input-schema: minimal project with default schema and templates
- With
- If AI is configured, PipeGen generates schemas, SQL, and docs; when
With
--input-csv <file>(CSV-driven generation)- Streaming analyzer reads the CSV (memory-safe, incremental) and infers:
- Column names & order
- Data types (int, long, double, boolean, string, timestamp) with numeric vs categorical heuristics
- Nullability & value counts
- Sample values for prompt grounding
- Generates an inferred AVRO schema at
schemas/input.avsc - Creates a filesystem source table at
sql/01_create_source_table.sqlusing the Flinkfilesystemconnector +csvformat - Marks the project as CSV mode (auto-detected later by
pipegen run– no extra flag needed) - If
--describeis also passed, AI prompt is enriched with a markdown analysis of each column (distribution, sample values) to produce higher-quality aggregations - Output / aggregation SQL is generated the same way as schema-driven mode, but grounded in your real data profile
- Streaming analyzer reads the CSV (memory-safe, incremental) and infers:
Generated Files
When you run pipegen init, it creates:
.pipegen.yaml- Project configurationschemas/- AVRO schemasinput.avsc(canonical input schema)output_result.avsc(AI path)
sql/- Flink SQL files (includes01_create_source_table.sqlin schema-driven path)- In CSV mode the source table uses:
'connector' = 'filesystem''path' = '<your CSV path>''format' = 'csv'- Proper column definitions inferred from the analyzer
- In CSV mode the source table uses:
docker-compose.yml,flink-conf.yaml,flink-entrypoint.sh(local stack)README.md- Project documentationsql/OPTIMIZATIONS.md- AI optimization suggestions (AI path)
AI Providers
Configure one of the following to enable AI:
- Ollama (local):
PIPEGEN_OLLAMA_MODEL=llama3.1and optionallyPIPEGEN_OLLAMA_URL=http://localhost:11434 - OpenAI (cloud):
PIPEGEN_OPENAI_API_KEY=...and optionallyPIPEGEN_LLM_MODEL=gpt-4o-mini
If neither is set, --describe gracefully falls back as described above.
Next Steps
cd <project-name>- Review generated
schemas/andsql/ - Start the local stack:
pipegen deploy - Run the pipeline:
pipegen run
- In CSV mode the run automatically skips the Kafka producer (filesystem source supplies data) while still starting the Kafka consumer to validate downstream output.
See also: AI Generation, Getting Started
