AWS Lambda Function to run SQL queries in Redshift Cluster through AWS Redshift data API

Huzefa Khan
4 min readJan 22, 2023

--

Amazon Redshift is fully managed warehouse service provided by AWS. The Amazon Redshift engine is a SQL-compliant, massively-parallel, query processing and database management system designed to support analytics workload.

Amazon Redshift is a fully managed, petabyte-scale data warehouse service in the cloud provided by Amazon Web Services (AWS). It allows users to store, analyze, and retrieve large amounts of data using SQL-based tools and business intelligence (BI) applications. Redshift uses a columnar storage format and advanced compression algorithms to provide high performance and efficient data storage. It also supports data integration with other AWS services such as S3, RDS, and EMR. Redshift is commonly used for big data analytics, business intelligence, and data warehousing workloads.

AWS GUI provides Query Editor to execute sql command. However it’s better to leverage APIs by using AWS Lambda function.

RedshiftDataAPIService

A low-level client representing Redshift Data API Service

You can use the Amazon Redshift Data API to run queries on Amazon Redshift tables. You can run SQL statements, which are committed if the statement succeeds.

For more information about the Amazon Redshift Data API and CLI usage examples, see Using the Amazon Redshift Data API in the Amazon Redshift Management Guide .

import boto3

client = boto3.client('redshift-data')

Read SQL Queries

Lets start with creating a flat file which would contain all our SQL commands that we want to execute in the Redshift Cluster. We will upload the flat file to the s3 bucket. The content for the file is

DROP MATERIALIZED VIEW IF EXISTS my_view;

CREATE MATERIALIZED VIEW my_vew AS
SELECT
name
FROM my_table;

Materialized views are useful in warehouses to increase the speed of queries on very large databases. They are a database object that stores the results of a query.

Write Lambda Function

Lets start by creating a lambda function handler which would be invoked when the lambda function run.

import json
import boto3
import os
import logging
import botocore
from datetime import datetime, timezone
import traceback
import time
import base64
from botocore.exceptions import ClientError

logger = logging.getLogger()
logger.setLevel(logging.INFO)

def lambda_handler(event, context):
message = 'Success'
status_code = 200
payload = ''

try:
message = exec_redshift(os.environ['secret_name'].strip(),
os.environ['region'].strip(),
os.environ['sql_bucket'].strip(),
os.environ['sql_prefix'].strip())
except Exception as e:
status_code = 400
payload = traceback.format_exc()
message = 'Failed'

return {
'statusCode': status_code,
'message': message,
'payload': json.loads(json.dumps(payload, default=str))
}

Its not a good practice to hard code cluster information or password in the code. We will this information from the AWS Secret Manger. I’ll assume that secret is already created. We will write a method to retrieve those values using secret_name passed as an Environment variable.

def get_secret_pwd(secret_name: str, region_name: str):
"""
Retrieve secret from AWS Secret manager
"""

# Create a Secrets Manager client
session = boto3.session.Session()

client = session.client(
service_name='secretsmanager',
region_name=region_name
)

try:
get_secret_value_response = client.get_secret_value(SecretId=secret_name)

except ClientError as e:
get_secret_value_response = e
if e.response['Error']['Code'] == 'DecryptionFailureException':
# Secrets Manager can't decrypt the protected secret text using the provided KMS key.
# Deal with the exception here, and/or rethrow at your discretion.
raise e
elif e.response['Error']['Code'] == 'InternalServiceErrorException':
# An error occurred on the server side.
# Deal with the exception here, and/or rethrow at your discretion.
raise e
elif e.response['Error']['Code'] == 'InvalidParameterException':
# You provided an invalid value for a parameter.
# Deal with the exception here, and/or rethrow at your discretion.
raise e
elif e.response['Error']['Code'] == 'InvalidRequestException':
# You provided a parameter value that is not valid for the current state of the resource.
# Deal with the exception here, and/or rethrow at your discretion.
raise e
elif e.response['Error']['Code'] == 'ResourceNotFoundException':
# We can't find the resource that you asked for.
# Deal with the exception here, and/or rethrow at your discretion.
raise e

secret = get_secret_value_response['SecretString']

return json.loads(secret)

Also we need to read the sql commands stored in s3 location.

def read_sql(bucket: str, prefix: str):
"""
Read SQL script from s3
"""
client = boto3.client("s3")

file_obj = client.get_object(Bucket=bucket, Key=prefix)

return file_obj['Body'].read().decode("utf8")

Lastly we will write our method exec_redshift that will do the following

  1. Retrieve cluster information from AWS Secret Manager

2. Read content of the sql file stored on s3

3. Execute sql commands sequentially in redshift cluster

def exec_redshift(secret_name: str,
region_name: str,
sql_bucket: str,
sql_prefix: str):
"""
Execute SQL commands in redshift cluster
"""

response = get_secret_pwd(secret_name,
region_name)

pwd = response['password']
cluster_name = response['ClusterName']
db_name = response['databaseName']
db_user = response['username']

sql_statements = read_sql(sql_bucket, sql_prefix)

redshift_data_client = boto3.client("redshift-data")

result = redshift_data_client.execute_statement(
ClusterIdentifier=cluster_name,
Database=db_name,
DbUser=db_user,
Sql=sql_statements

)
id = result['Id']
statement = ''
status = ''

# We have to wait in loop for the sql commands to finish executing
while status != 'FINISHED' and status != 'FAILED' and status != 'ABORTED':
statement = redshift_data_client.describe_statement(Id=id)
status = statement['Status']
time.sleep(2)

status = statement['Status']

if status == "FAILED":
raise Exception(statement['Error'])

return status

References
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/redshift-data.html

https://docs.aws.amazon.com/redshift/latest/mgmt/data-api.html

--

--

Huzefa Khan
Huzefa Khan

Written by Huzefa Khan

Passionate Sr. Data Engineer with years of experience in developing and architecting high-class data solutions https://www.linkedin.com/in/huzzefakhan/

Responses (2)