Working with Postgres table in Glue ETL and calling stored procedures
Glue is king when it comes to simple, easily chained ETL that ties into the catalog.
Working with Postgres table in Glue ETL and calling stored procedures
Glue is king when it comes to simple, easily chained ETL that ties into the catalog. Glue Datacatalog opens many doors, and the Workflow ability to be used with features like Python Shell, Bookmarks, Crawlers etc. is really helpful for building simple and efficient operations. Based on my individual perspective, the advantage for glue is on the catalog management, you can use Glue crawler to crawler the data from different source and manage the data catalog easily. You can utilize glue ETL to do some simple work like data dump, format change and so on without very intensive memory requirement. Let's look at an example to read data from s3 and write to PostGres.
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "default", table_name = "gluetesting", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "default", table_name = "gluetesting", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("col0", "string", "col0", "string"), ("col1", "string", "col1", "string"), ("col2", "string", "col2", "string")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("col0", "string", "col0", "string"), ("col1", "string", "col1", "string"), ("col2", "string", "col2", "string")], transformation_ctx = "applymapping1")
## @type: ResolveChoice
## @args: [choice = "make_cols", transformation_ctx = "resolvechoice2"]
## @return: resolvechoice2
## @inputs: [frame = applymapping1]
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_cols", transformation_ctx = "resolvechoice2")
## @type: DropNullFields
## @args: [transformation_ctx = "dropnullfields3"]
## @return: dropnullfields3
## @inputs: [frame = resolvechoice2]
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
## @type: DataSink
## @args: [catalog_connection = "postgres", connection_options = {"dbtable": "gluetesting", "database": "chintan"}, transformation_ctx = "datasink4"]
## @return: datasink4
## @inputs: [frame = dropnullfields3]
datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = dropnullfields3, catalog_connection = "postgres", connection_options = {"dbtable": "gluetesting", "database": "chintan"}, transformation_ctx = "datasink4")
job.commit()Also, if you want to execute SQL queries against the table in the Glue job while reading the data, you can do so using spark dataframe.
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
sqlContext = SQLContext(sc)if you need to load complete table set query="employee" where employee is the table name in database.
here a query is used to filter the data at source itself.
query= "(select * from emp as employee where dob < '12/12/1996') as temptab"
datasource0 = sqlContext.read.format("jdbc").option("url", "jdbc:postgresql://gluepostgre.xxxxxxxxxxxx.ap-south-1.rds.amazonaws.com:5432/gluedb").option("driver", "org.postgresql.Driver").option("user", "xxxxxx").option("password", "xxxxxxx").option("dbtable", query).load()
datasource0.printSchema()
datasource0.count()
datasource0.show()
datasource0.write\
.format("jdbc")
.option("driver", "org.postgresql.Driver")
.option("url", "jdbc:postgresql://abcde-1.cvhtsxcbkj76.ap-northeast-1.rds.amazonaws.com:5432/dev")
.option("dbtable", "newtable")
.option("user", "postgres")
.option("password", "xxxx")
.mode("overwrite").save()Further to execute stored procedures we can make use of psycopg2. Glue natively doesn't have support for psycopg2.
Psycopg2 works with Spark version 2.4, Python 2, Glue 1.0 however it does not work with python 3 ( Spark version 2.4, Python 3, Glue 1.0). To work with python 2, we need to install the psycopg2-binary with python 2, pass the zipped module as extra python library under "Python library path" and run a Glue Spark job with Spark version 2.4, Python 2, Glue 1.0.
Basically you can build the module using the following steps -
mkdir psycopg2-binary
cd psycopg2-binary
pip install psycopg2-binary -t .
zip -r9 psycopg2.zip *ETL code using psycopg2
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
import sys
import os
import zipfile
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
zip_ref = zipfile.ZipFile('./psycopg2.zip', 'r')
print(os.listdir('.'))
zip_ref.extractall('/tmp/packages')
zip_ref.close()
sys.path.insert(0, '/tmp/packages')
import psycopg2
print ("********************************")
##print(psycopg2._version_)
print(psycopg2.__version__)
print ("********************************")
job.commit()Alternatively if we need to use Glue Spark job, with Spark version 2.4, Python 3, Glue 1.0, you can use a pure-Python alternative of psycopg2 like pg8000 module - https://gist.github.com/rakeshsingh/709c700aa78aeff00ca5
This module has a dependency on another external ‘scramp’ package. So we will need to include both the modules in the python reference library path. We can accomplish this by adding comma-separated S3 paths of both of these ZIP files.
Create these ZIPs as below:
wget https://files.pythonhosted.org/packages/9f/92/9ecdc1ff8d67a872bc11bef8b8868da4637e2c980a4df9cf3267384c1f42/pg8000-1.13.2.tar.gz
tar -xvzf pg8000-1.13.2.tar.gz
cd pg8000-1.13.2
zip -r pg8000.zip pg8000
aws s3 cp pg8000.zip s3://<bucket>/<folder>/
wget https://files.pythonhosted.org/packages/f6/3c/7760673a2cabe85ccb2e92996991cda839baae022b2c6f72501c747699f4/scramp-1.1.0.tar.gz
tar -xvzf scramp-1.1.0.tar.gz
cd scramp-1.1.0
zip -r scramp.zip scramp
aws s3 cp scramp.zip s3://<bucket>/<folder>/We can then use the zips in our Glue job references python files and the import for pg8000 module should be successful.