SQL Queries
Important Capabilities
| Capability | Status | Notes | 
|---|---|---|
| Column-level Lineage | ✅ | Parsed from SQL queries | 
| Table-Level Lineage | ✅ | Parsed from SQL queries | 
This source reads a newline-delimited JSON file containing SQL queries and parses them to generate lineage.
Query File Format
This file should contain one JSON object per line, with the following fields:
- query: string - The SQL query to parse.
- timestamp (optional): number - The timestamp of the query, in seconds since the epoch.
- user (optional): string - The user who ran the query. This user value will be directly converted into a DataHub user urn.
- operation_type (optional): string - Platform-specific operation type, used if the operation type can't be parsed.
- downstream_tables (optional): string[] - Fallback list of tables that the query writes to, used if the query can't be parsed.
- upstream_tables (optional): string[] - Fallback list of tables the query reads from, used if the query can't be parsed.
Example Queries File
{"query": "SELECT x FROM my_table", "timestamp": 1689232738.051, "user": "user_a", "downstream_tables": [], "upstream_tables": ["my_database.my_schema.my_table"]}
{"query": "INSERT INTO my_table VALUES (1, 'a')", "timestamp": 1689232737.669, "user": "user_b", "downstream_tables": ["my_database.my_schema.my_table"], "upstream_tables": []}
Note that this file does not represent a single JSON object, but instead newline-delimited JSON, in which each line is a separate JSON object.
CLI based Ingestion
Install the Plugin
pip install 'acryl-datahub[sql-queries]'
Starter Recipe
Check out the following recipe to get started with ingestion! See below for full configuration options.
For general pointers on writing and running a recipe, see our main recipe guide.
datahub_api:  # Only necessary if using a non-DataHub sink, e.g. the file sink
  server: http://localhost:8080
  timeout_sec: 60
source:
  type: sql-queries
  config:
    platform: "snowflake"
    default_db: "SNOWFLAKE"
    query_file: "./queries.json"
Config Details
- Options
- Schema
Note that a . is used to denote nested fields in the YAML recipe.
| Field | Description | 
|---|---|
| platform ✅ string | The platform for which to generate data, e.g. snowflake | 
| query_file ✅ string | Path to file to ingest | 
| default_db string | The default database to use for unqualified table names | 
| default_schema string | The default schema to use for unqualified table names | 
| platform_instance string | The instance of the platform that all assets produced by this recipe belong to | 
| env string | The environment that all assets produced by this connector belong to Default: PROD | 
| usage BaseUsageConfig | The usage config to use when generating usage statistics Default: {'bucket_duration': 'DAY', 'end_time': '2024-01-20... | 
| usage.bucket_duration Enum | Size of the time window to aggregate usage stats. Default: DAY | 
| usage.end_time string(date-time) | Latest date of lineage/usage to consider. Default: Current time in UTC | 
| usage.format_sql_queries boolean | Whether to format sql queries Default: False | 
| usage.include_operational_stats boolean | Whether to display operational stats. Default: True | 
| usage.include_read_operational_stats boolean | Whether to report read operational stats. Experimental. Default: False | 
| usage.include_top_n_queries boolean | Whether to ingest the top_n_queries. Default: True | 
| usage.start_time string(date-time) | Earliest date of lineage/usage to consider. Default: Last full day in UTC (or hour, depending on bucket_duration). You can also specify relative time with respect to end_time such as '-7 days' Or '-7d'. | 
| usage.top_n_queries integer | Number of top queries to save to each table. Default: 10 | 
| usage.user_email_pattern AllowDenyPattern | regex patterns for user emails to filter in usage. Default: {'allow': ['.*'], 'deny': [], 'ignoreCase': True} | 
| usage.user_email_pattern.allow array(string) | |
| usage.user_email_pattern.deny array(string) | |
| usage.user_email_pattern.ignoreCase boolean | Whether to ignore case sensitivity during pattern matching. Default: True | 
The JSONSchema for this configuration is inlined below.
{
  "title": "SqlQueriesSourceConfig",
  "description": "Any source that connects to a platform should inherit this class",
  "type": "object",
  "properties": {
    "env": {
      "title": "Env",
      "description": "The environment that all assets produced by this connector belong to",
      "default": "PROD",
      "type": "string"
    },
    "platform_instance": {
      "title": "Platform Instance",
      "description": "The instance of the platform that all assets produced by this recipe belong to",
      "type": "string"
    },
    "query_file": {
      "title": "Query File",
      "description": "Path to file to ingest",
      "type": "string"
    },
    "platform": {
      "title": "Platform",
      "description": "The platform for which to generate data, e.g. snowflake",
      "type": "string"
    },
    "usage": {
      "title": "Usage",
      "description": "The usage config to use when generating usage statistics",
      "default": {
        "bucket_duration": "DAY",
        "end_time": "2024-01-20T14:22:42.922837+00:00",
        "start_time": "2024-01-19T00:00:00+00:00",
        "queries_character_limit": 24000,
        "top_n_queries": 10,
        "user_email_pattern": {
          "allow": [
            ".*"
          ],
          "deny": [],
          "ignoreCase": true
        },
        "include_operational_stats": true,
        "include_read_operational_stats": false,
        "format_sql_queries": false,
        "include_top_n_queries": true
      },
      "allOf": [
        {
          "$ref": "#/definitions/BaseUsageConfig"
        }
      ]
    },
    "default_db": {
      "title": "Default Db",
      "description": "The default database to use for unqualified table names",
      "type": "string"
    },
    "default_schema": {
      "title": "Default Schema",
      "description": "The default schema to use for unqualified table names",
      "type": "string"
    }
  },
  "required": [
    "query_file",
    "platform"
  ],
  "additionalProperties": false,
  "definitions": {
    "BucketDuration": {
      "title": "BucketDuration",
      "description": "An enumeration.",
      "enum": [
        "DAY",
        "HOUR"
      ],
      "type": "string"
    },
    "AllowDenyPattern": {
      "title": "AllowDenyPattern",
      "description": "A class to store allow deny regexes",
      "type": "object",
      "properties": {
        "allow": {
          "title": "Allow",
          "description": "List of regex patterns to include in ingestion",
          "default": [
            ".*"
          ],
          "type": "array",
          "items": {
            "type": "string"
          }
        },
        "deny": {
          "title": "Deny",
          "description": "List of regex patterns to exclude from ingestion.",
          "default": [],
          "type": "array",
          "items": {
            "type": "string"
          }
        },
        "ignoreCase": {
          "title": "Ignorecase",
          "description": "Whether to ignore case sensitivity during pattern matching.",
          "default": true,
          "type": "boolean"
        }
      },
      "additionalProperties": false
    },
    "BaseUsageConfig": {
      "title": "BaseUsageConfig",
      "type": "object",
      "properties": {
        "bucket_duration": {
          "description": "Size of the time window to aggregate usage stats.",
          "default": "DAY",
          "allOf": [
            {
              "$ref": "#/definitions/BucketDuration"
            }
          ]
        },
        "end_time": {
          "title": "End Time",
          "description": "Latest date of lineage/usage to consider. Default: Current time in UTC",
          "type": "string",
          "format": "date-time"
        },
        "start_time": {
          "title": "Start Time",
          "description": "Earliest date of lineage/usage to consider. Default: Last full day in UTC (or hour, depending on `bucket_duration`). You can also specify relative time with respect to end_time such as '-7 days' Or '-7d'.",
          "type": "string",
          "format": "date-time"
        },
        "top_n_queries": {
          "title": "Top N Queries",
          "description": "Number of top queries to save to each table.",
          "default": 10,
          "exclusiveMinimum": 0,
          "type": "integer"
        },
        "user_email_pattern": {
          "title": "User Email Pattern",
          "description": "regex patterns for user emails to filter in usage.",
          "default": {
            "allow": [
              ".*"
            ],
            "deny": [],
            "ignoreCase": true
          },
          "allOf": [
            {
              "$ref": "#/definitions/AllowDenyPattern"
            }
          ]
        },
        "include_operational_stats": {
          "title": "Include Operational Stats",
          "description": "Whether to display operational stats.",
          "default": true,
          "type": "boolean"
        },
        "include_read_operational_stats": {
          "title": "Include Read Operational Stats",
          "description": "Whether to report read operational stats. Experimental.",
          "default": false,
          "type": "boolean"
        },
        "format_sql_queries": {
          "title": "Format Sql Queries",
          "description": "Whether to format sql queries",
          "default": false,
          "type": "boolean"
        },
        "include_top_n_queries": {
          "title": "Include Top N Queries",
          "description": "Whether to ingest the top_n_queries.",
          "default": true,
          "type": "boolean"
        }
      },
      "additionalProperties": false
    }
  }
}
Code Coordinates
- Class Name: datahub.ingestion.source.sql_queries.SqlQueriesSource
- Browse on GitHub
Questions
If you've got any questions on configuring ingestion for SQL Queries, feel free to ping us on our Slack.