annotating-task-lineage

astronomer/agents · updated Apr 8, 2026

MDX-style export adds YAML metadata + attribution linking explainx.ai and this canonical listing URL.

$npx skills add https://github.com/astronomer/agents --skill annotating-task-lineage
0 commentsdiscussion
summary

Annotate Airflow tasks with data lineage using inlets and outlets.

  • Supports OpenLineage Dataset objects, Airflow Assets, and Airflow Datasets for defining inputs and outputs across databases, data warehouses, and cloud storage
  • Use as a fallback when operators lack built-in OpenLineage extractors; follows a four-tier precedence system where custom extractors and OpenLineage methods take priority
  • Includes dataset naming helpers for Snowflake, BigQuery, S3, and PostgreSQL to ensure cons
skill.md

Annotating Task Lineage with Inlets & Outlets

This skill guides you through adding manual lineage annotations to Airflow tasks using inlets and outlets.

Reference: See the OpenLineage provider developer guide for the latest supported operators and patterns.

On Astro

Lineage annotations defined with inlets and outlets are visualized in Astro's enhanced Lineage tab, which provides cross-DAG and cross-deployment lineage views. This means your annotations are immediately visible in the Astro UI, giving you a unified view of data flow across your entire Astro organization.

When to Use This Approach

Scenario Use Inlets/Outlets?
Operator has OpenLineage methods (get_openlineage_facets_on_*) ❌ Modify the OL method directly
Operator has no built-in OpenLineage extractor ✅ Yes
Simple table-level lineage is sufficient ✅ Yes
Quick lineage setup without custom code ✅ Yes
Need column-level lineage ❌ Use OpenLineage methods or custom extractor
Complex extraction logic needed ❌ Use OpenLineage methods or custom extractor

Note: Inlets/outlets are the lowest-priority fallback. If an OpenLineage extractor or method exists for the operator, it takes precedence. Use this approach for operators without extractors.


Supported Types for Inlets/Outlets

You can use OpenLineage Dataset objects or Airflow Assets for inlets and outlets:

OpenLineage Datasets (Recommended)

from openlineage.client.event_v2 import Dataset

# Database tables
source_table = Dataset(
    namespace="postgres://mydb:5432",
    name="public.orders",
)
target_table = Dataset(
    namespace="snowflake://account.snowflakecomputing.com",
    name="staging.orders_clean",
)

# Files
input_file = Dataset(
    namespace="s3://my-bucket",
    name="raw/events/2024-01-01.json",
)

Airflow Assets (Airflow 3+)

from airflow.sdk import Asset

# Using Airflow's native Asset type
orders_asset = Asset(uri="s3://my-bucket/data/orders")

Airflow Datasets (Airflow 2.4+)

from airflow.datasets import Dataset

# Using Airflow's Dataset type (Airflow 2.4-2.x)
orders_dataset = Dataset(uri="s3://my-bucket/data/orders")

Basic Usage

Setting Inlets and Outlets on Operators

from airflow import DAG
from airflow.operators.bash import BashOperator
from openlineage.client.event_v2 import Dataset
import pendulum

# Define your lineage datasets
source_table = Dataset(
    namespace="snowflake://account.snowflakecomputing.com",
    name="raw.orders",
)
target_table = Dataset(
    namespace="snowflake://account.snowflakecomputing.com",
    name="staging.orders_clean",
)
output_file = Dataset(
    namespace="s3://my-bucket",
    name="exports/orders.parquet",
)

with DAG(
    dag_id="etl_with_lineage",
    start_date=pendulum.datetime(2024, 1, 1, tz="UTC"),
    schedule="@daily",
) as dag:

    transform = BashOperator(
        task_id="transform_orders",
        bash_command="echo 'transforming...'",
        inlets=[source_table],           # What this task reads
        outlets=[target_table],          # What this task writes
    )

    export = BashOperator(
        task_id="export_to_s3",
        bash_command="echo 'exporting...'",
        inlets=[target_table],           # Reads from previous output
        outlets=[output_file],           # Writes to S3
    )

    transform >> export

Multiple Inputs and Outputs

Tasks often read from multiple sources and write to multiple destinations:

from openlineage.client.event_v2 import Dataset

# Multiple source tables
customers = Dataset(namespace="postgres://crm:5432", name="public.customers")
orders = Dataset(namespace="postgres://sales:5432", name="public.orders")
products = Dataset(namespace="postgres://inventory:5432", name="public.products")

# Multiple output tables
daily_summary = Dataset(namespace="snowflake://account", name="analytics.daily_summary")
customer_metrics = Dataset(namespace="snowflake://account", name="analytics.customer_metrics")

aggregate_task = PythonOperator(
    task_id="build_daily_aggregates",
    python_callable=build_aggregates,
    inlets=[customers, orders, products],      # All inputs
    outlets=[daily_summary, customer_metrics], # All outputs
)

Setting Lineage in Custom Operators

When building custom operators, you have two options:

Option 1: Implement OpenLineage Methods (Recommended)

This is the preferred approach as it gives you full control over lineage extraction:

from airflow.models import BaseOperator


class MyCustomOperator(BaseOperator):
    def __init__(self, source_table: str, target_table: str, **kwargs):
        super().__init__(**kwargs)
        self.source_table = source_table
        self.target_table = target_table

    def execute(self, context):
        # ... perform the actual work ...
        self.log.info(f"Processing {self.source_table} -> {self.target_table}")

    def get_openlineage_facets_on_complete(self, task_instance):
        """Return lineage after successful execution."""
        from openlineage.client.event_v2 import Dataset
        from airflow.providers.openlineage.extractors import OperatorLineage

        return OperatorLineage(
            inputs=[Dataset(namespace="warehouse://db", name=self.source_table)],
            outputs=[Dataset(namespace="warehouse://db", name=self.target_table)],
        )

Option 2: Set Inlets/Outlets Dynamically

For simpler cases, set lineage within the execute method (non-deferrable operators only):

from airflow.models import BaseOperator
from openlineage.client.event_v2 import Dataset


class MyCustomOperator(BaseOperator):
    def __init__(self, source_table: str, target_table: str, **kwargs):
        super().__init__(**kwargs)
        self.source_table = source_table
        self.target_table = target_table

    def execute(self, context):<
how to use annotating-task-lineage

How to use annotating-task-lineage on Cursor

AI-first code editor with Composer

1

Prerequisites

Before installing skills in Cursor, ensure your development environment meets these requirements:

  • Cursor installed and configured on your development machine
  • Node.js version 16.0+ with npm package manager (verify with node --version)
  • Active project directory or workspace where you want to add annotating-task-lineage
2

Execute installation command

Execute the skills CLI command in your project's root directory to begin installation:

$npx skills add https://github.com/astronomer/agents --skill annotating-task-lineage

The skills CLI fetches annotating-task-lineage from GitHub repository astronomer/agents and configures it for Cursor.

3

Select Cursor when prompted

The CLI will show a list of available agents. Use arrow keys to navigate and space to select Cursor:

◆ Which agents do you want to install to?
│ ── Universal (.agents/skills) ── always included ────
│ • Amp
│ • Antigravity
│ • Cline
│ • Codex
│ ●Cursor(selected)
│ • Cursor
│ • Windsurf
4

Verify installation

Confirm successful installation by checking the skill directory location:

.cursor/skills/annotating-task-lineage

Reload or restart Cursor to activate annotating-task-lineage. Access the skill through slash commands (e.g., /annotating-task-lineage) or your agent's skill management interface.

Security & Verification Notice

We perform automated surface-level scans (Gen AI Scanner, Socket, Snyk) during installation. These checks detect common vulnerabilities but do not guarantee complete security. Always review skill source code and verify the publisher's reputation before production use.

Skills execute code in your development environment. Always verify the publisher's identity, review recent commits, and test in isolated environments before production deployment.

List & Monetize Your Skill

Submit your Claude Code skill and start earning

GET_STARTED →

Use Cases

User Story & Requirements Generation

Create detailed user stories, acceptance criteria, and feature specs

Example

Generate user stories for 'password reset feature' with acceptance criteria, edge cases, and test scenarios

Reduce spec writing time by 50%, ensure comprehensive coverage

Competitive Analysis

Research competitors, compare features, identify gaps

Example

Analyze 5 competitor products, create feature comparison matrix, suggest differentiation opportunities

Complete competitive research in 2 hours instead of 2 days

Roadmap Prioritization

Evaluate features using frameworks (RICE, ICE, Kano) and create prioritized backlogs

Example

Score 20 feature ideas using RICE framework, generate prioritized roadmap with rationale

Make data-driven prioritization decisions faster

Stakeholder Communication

Draft PRDs, status updates, and stakeholder presentations

Example

Create executive summary of Q3 roadmap, monthly progress report, feature launch announcement

Save 3-5 hours/week on communication overhead

Implementation Guide

Prerequisites

  • Claude Desktop or compatible AI client
  • Access to product documentation and roadmap tools (Jira, Notion, etc.)
  • Understanding of product management frameworks (RICE, Jobs-to-be-Done, etc.)
  • Stakeholder contact information and communication channels

Time Estimate

30-60 minutes to see productivity improvements

Installation Steps

  1. 1.Install product management skill
  2. 2.Start with user story generation for known feature
  3. 3.Progress to competitive analysis: research 2-3 competitors
  4. 4.Use for roadmap prioritization: apply RICE/ICE scoring
  5. 5.Draft stakeholder communications and refine based on feedback
  6. 6.Build template library for recurring PM tasks
  7. 7.Share effective prompts with product team

Common Pitfalls

  • Not validating competitive research—verify facts before sharing
  • Accepting user stories without involving engineering team
  • Over-relying on frameworks without qualitative judgment
  • Not customizing outputs to company culture and communication style
  • Skipping stakeholder validation of generated requirements

Best Practices

✓ Do

  • +Validate research and competitive analysis with real data
  • +Collaborate with engineering when generating technical requirements
  • +Customize frameworks and templates to your company context
  • +Use skill for first drafts, refine with stakeholder input
  • +Document successful prompt patterns for PM tasks
  • +Combine AI efficiency with human judgment and intuition

✗ Don't

  • Don't publish competitive analysis without fact-checking
  • Don't finalize user stories without engineering review
  • Don't make prioritization decisions solely on AI scoring
  • Don't skip customer validation of generated requirements
  • Don't ignore company-specific context and culture

💡 Pro Tips

  • Provide context: company goals, constraints, customer feedback
  • Ask for alternatives: 'Show 3 ways to prioritize this roadmap'
  • Request stakeholder-specific formatting: 'Executive summary vs. engineering spec'
  • Use skill for 70% generation + 30% customization to company needs

When to Use This

✓ Use When

Use for user story writing, competitive research, roadmap prioritization, stakeholder communication, and PRD drafting. Best for reducing repetitive documentation and research work.

✗ Avoid When

Avoid for strategic product vision (requires deep customer empathy), pricing decisions (needs market and financial expertise), or when face-to-face customer discovery is more valuable than speed.

Learning Path

  1. 1Basic: user stories, feature specs, status updates
  2. 2Intermediate: competitive analysis, prioritization frameworks, PRDs
  3. 3Advanced: product strategy, go-to-market planning, OKR setting
  4. 4Expert: product vision, market positioning, business model innovation

Discussion

Product Hunt–style comments (not star reviews)
  • No comments yet — start the thread.
general reviews

Ratings

4.835 reviews
  • Fatima Garcia· Dec 28, 2024

    Registry listing for annotating-task-lineage matched our evaluation — installs cleanly and behaves as described in the markdown.

  • Shikha Mishra· Dec 8, 2024

    annotating-task-lineage reduced setup friction for our internal harness; good balance of opinion and flexibility.

  • Naina Gill· Dec 8, 2024

    annotating-task-lineage reduced setup friction for our internal harness; good balance of opinion and flexibility.

  • Yash Thakker· Nov 27, 2024

    I recommend annotating-task-lineage for anyone iterating fast on agent tooling; clear intent and a small, reviewable surface area.

  • Naina Rao· Nov 27, 2024

    I recommend annotating-task-lineage for anyone iterating fast on agent tooling; clear intent and a small, reviewable surface area.

  • Fatima Johnson· Nov 23, 2024

    Keeps context tight: annotating-task-lineage is the kind of skill you can hand to a new teammate without a long onboarding doc.

  • Neel Bansal· Nov 19, 2024

    Useful defaults in annotating-task-lineage — fewer surprises than typical one-off scripts, and it plays nicely with `npx skills` flows.

  • Dhruvi Jain· Oct 18, 2024

    Useful defaults in annotating-task-lineage — fewer surprises than typical one-off scripts, and it plays nicely with `npx skills` flows.

  • Yusuf Martin· Oct 18, 2024

    Useful defaults in annotating-task-lineage — fewer surprises than typical one-off scripts, and it plays nicely with `npx skills` flows.

  • Neel Abbas· Oct 14, 2024

    annotating-task-lineage is among the better-maintained entries we tried; worth keeping pinned for repeat workflows.

showing 1-10 of 35

1 / 4