DynamoDB
Important Capabilities
| Capability | Status | Notes | 
|---|---|---|
| Detect Deleted Entities | ✅ | Optionally enabled via stateful_ingestion.remove_stale_metadata | 
| Platform Instance | ✅ | By default, platform_instance will use the AWS account id | 
This plugin extracts the following:
AWS DynamoDB table names with their region, and infer schema of attribute names and types by scanning the table
Prerequisities
In order to execute this source, you need to attach the AmazonDynamoDBReadOnlyAccess policy to a user in your AWS account. Then create an API access key and secret for the user.
For a user to be able to create API access key, it needs the following access key permissions. Your AWS account admin can create a policy with these permissions and attach to the user, you can find more details in Managing access keys for IAM users
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "VisualEditor0",
      "Effect": "Allow",
      "Action": [
        "iam:ListAccessKeys",
        "iam:CreateAccessKey",
        "iam:UpdateAccessKey",
        "iam:DeleteAccessKey"
      ],
      "Resource": "arn:aws:iam::${aws_account_id}:user/${aws:username}"
    }
  ]
}
CLI based Ingestion
Install the Plugin
pip install 'acryl-datahub[dynamodb]'
Starter Recipe
Check out the following recipe to get started with ingestion! See below for full configuration options.
For general pointers on writing and running a recipe, see our main recipe guide.
source:
  type: dynamodb
  config:
    platform_instance: "AWS_ACCOUNT_ID"
    aws_access_key_id: "${AWS_ACCESS_KEY_ID}"
    aws_secret_access_key: "${AWS_SECRET_ACCESS_KEY}"
    #
    # If there are items that have most representative fields of the table, users could use the
    # `include_table_item` option to provide a list of primary keys of the table in dynamodb format.
    # For each `region.table`, the list of primary keys can be at most 100.
    # We include these items in addition to the first 100 items in the table when we scan it.
    #
    # include_table_item:
    #   region.table_name:
    #     [
    #       {
    #         "partition_key_name": { "attribute_type": "attribute_value" },
    #         "sort_key_name": { "attribute_type": "attribute_value" },
    #       },
    #     ]
sink:
  # sink configs
Config Details
- Options
- Schema
Note that a . is used to denote nested fields in the YAML recipe.
| Field | Description | 
|---|---|
| aws_access_key_id ✅ string | AWS Access Key ID. | 
| aws_secret_access_key ✅ string(password) | AWS Secret Key. | 
| include_table_item map(str,array) | |
| platform_instance string | The instance of the platform that all assets produced by this recipe belong to | 
| env string | The environment that all assets produced by this connector belong to Default: PROD | 
| table_pattern AllowDenyPattern | Regex patterns for tables to filter in ingestion. The table name format is 'region.table' Default: {'allow': ['.*'], 'deny': [], 'ignoreCase': True} | 
| table_pattern.allow array(string) | |
| table_pattern.deny array(string) | |
| table_pattern.ignoreCase boolean | Whether to ignore case sensitivity during pattern matching. Default: True | 
| stateful_ingestion StatefulStaleMetadataRemovalConfig | Base specialized config for Stateful Ingestion with stale metadata removal capability. | 
| stateful_ingestion.enabled boolean | The type of the ingestion state provider registered with datahub. Default: False | 
| stateful_ingestion.remove_stale_metadata boolean | Soft-deletes the entities present in the last successful run but missing in the current run with stateful_ingestion enabled. Default: True | 
The JSONSchema for this configuration is inlined below.
{
  "title": "DynamoDBConfig",
  "description": "Any source that is a primary producer of Dataset metadata should inherit this class",
  "type": "object",
  "properties": {
    "stateful_ingestion": {
      "$ref": "#/definitions/StatefulStaleMetadataRemovalConfig"
    },
    "env": {
      "title": "Env",
      "description": "The environment that all assets produced by this connector belong to",
      "default": "PROD",
      "type": "string"
    },
    "platform_instance": {
      "title": "Platform Instance",
      "description": "The instance of the platform that all assets produced by this recipe belong to",
      "type": "string"
    },
    "aws_access_key_id": {
      "title": "Aws Access Key Id",
      "description": "AWS Access Key ID.",
      "type": "string"
    },
    "aws_secret_access_key": {
      "title": "Aws Secret Access Key",
      "description": "AWS Secret Key.",
      "type": "string",
      "writeOnly": true,
      "format": "password"
    },
    "include_table_item": {
      "title": "Include Table Item",
      "description": "[Advanced] The primary keys of items of a table in dynamodb format the user would like to include in schema. Refer \"Advanced Configurations\" section for more details",
      "type": "object",
      "additionalProperties": {
        "type": "array",
        "items": {
          "type": "object"
        }
      }
    },
    "table_pattern": {
      "title": "Table Pattern",
      "description": "Regex patterns for tables to filter in ingestion. The table name format is 'region.table'",
      "default": {
        "allow": [
          ".*"
        ],
        "deny": [],
        "ignoreCase": true
      },
      "allOf": [
        {
          "$ref": "#/definitions/AllowDenyPattern"
        }
      ]
    }
  },
  "required": [
    "aws_access_key_id",
    "aws_secret_access_key"
  ],
  "additionalProperties": false,
  "definitions": {
    "DynamicTypedStateProviderConfig": {
      "title": "DynamicTypedStateProviderConfig",
      "type": "object",
      "properties": {
        "type": {
          "title": "Type",
          "description": "The type of the state provider to use. For DataHub use `datahub`",
          "type": "string"
        },
        "config": {
          "title": "Config",
          "description": "The configuration required for initializing the state provider. Default: The datahub_api config if set at pipeline level. Otherwise, the default DatahubClientConfig. See the defaults (https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/src/datahub/ingestion/graph/client.py#L19)."
        }
      },
      "required": [
        "type"
      ],
      "additionalProperties": false
    },
    "StatefulStaleMetadataRemovalConfig": {
      "title": "StatefulStaleMetadataRemovalConfig",
      "description": "Base specialized config for Stateful Ingestion with stale metadata removal capability.",
      "type": "object",
      "properties": {
        "enabled": {
          "title": "Enabled",
          "description": "The type of the ingestion state provider registered with datahub.",
          "default": false,
          "type": "boolean"
        },
        "remove_stale_metadata": {
          "title": "Remove Stale Metadata",
          "description": "Soft-deletes the entities present in the last successful run but missing in the current run with stateful_ingestion enabled.",
          "default": true,
          "type": "boolean"
        }
      },
      "additionalProperties": false
    },
    "AllowDenyPattern": {
      "title": "AllowDenyPattern",
      "description": "A class to store allow deny regexes",
      "type": "object",
      "properties": {
        "allow": {
          "title": "Allow",
          "description": "List of regex patterns to include in ingestion",
          "default": [
            ".*"
          ],
          "type": "array",
          "items": {
            "type": "string"
          }
        },
        "deny": {
          "title": "Deny",
          "description": "List of regex patterns to exclude from ingestion.",
          "default": [],
          "type": "array",
          "items": {
            "type": "string"
          }
        },
        "ignoreCase": {
          "title": "Ignorecase",
          "description": "Whether to ignore case sensitivity during pattern matching.",
          "default": true,
          "type": "boolean"
        }
      },
      "additionalProperties": false
    }
  }
}
Advanced Configurations
Using include_table_item config
If there are items that have most representative fields of the table, users could use the include_table_item option to provide a list of primary keys of the table in dynamodb format. We include these items in addition to the first 100 items in the table when we scan it.
Take AWS DynamoDB Developer Guide Example tables and data as an example, if a account has a table Reply in the us-west-2 region with composite primary key Id and ReplyDateTime, users can use include_table_item to include 2 items as following:
Example:
# The table name should be in the format of region.table_name
# The primary keys should be in the DynamoDB format
include_table_item:
  us-west-2.Reply:
    [
      {
        "ReplyDateTime": { "S": "2015-09-22T19:58:22.947Z" },
        "Id": { "S": "Amazon DynamoDB#DynamoDB Thread 1" },
      },
      {
        "ReplyDateTime": { "S": "2015-10-05T19:58:22.947Z" },
        "Id": { "S": "Amazon DynamoDB#DynamoDB Thread 2" },
      },
    ]
Code Coordinates
- Class Name: datahub.ingestion.source.dynamodb.dynamodb.DynamoDBSource
- Browse on GitHub
Questions
If you've got any questions on configuring ingestion for DynamoDB, feel free to ping us on our Slack.