AWS Glue, Dev Endpoint and Zeppelin Notebook
AWS Glue is quite a powerful tool. What I like about it is that it's managed: you don't need to take care of infrastructure yourself, but instead AWS hosts it for you. You can schedule scripts to run in the morning and your data will be in its right place by the time you get to work.
The downside is that developing scripts for AWS Glue is cumbersom, a real pain in the butt. I first tried to code the scripts through the console, but you end up waiting a lot only to realize you had a syntax error in your code.
The documentation of glue programming is not great in my opinion. Also, the script on AWS Glue console differs slightly from the one you would run on the Dev Endpoint (e.g. Dev Endpoint doesn't know about Job objects or parameters).
So, it takes a bit of trial and error to get going, but once you do, knowing how to launch a Glue Dev Endpoint radically increases the speed at which you can iterate on your Glue ETL scripts.
Starting point for this post
Firstly, the point of a Glue dev endpoint is that you get a dedicated Glue instance, just for you, and you don't need to wait. What you need to know about a AWS Glue Dev Endpoint is:
- It's Reserved instances to you, they cost money when they're up
- It runs Spark. You can develop with scala or python (pyspark)
- You need to use SSL certificates and SSH tunneling to connect to your Dev Endpoint
Pre-requisites:
- An IAM role for the Glue Dev Endpoint with the necessary policies. E.g. AWSGlueServiceRole. There are more instructions here.
- You have a table in Glue Data Catalog and the necessary connections.
- I assume you know your way around VPC networking, security groups etc. E.g. the Dev Endpoint requires a security group that allows the port 22, since we need that for the SSH tunneling.
Have an SSL key pair at hand
- If you don't have a pair, you can create one under EC2 -> Network & Security -> Key Pairs
- You need the contents of the public key. The contents look something like this:
ssh-rsa AAAAB3NzaC1qph6lpY7D6MSTRIlnU/kc2EAAR4g5QK44bwTuNXPcEFijps1jy6yt243llWn3oScxfNQ/4+tR9m2LxAxkmJRVNcCLTnBTTd0uRxd5nLdGGDYB0xEGNT3VMjxCa2X19BH3gAABJQAAAQEAn6RSOBt/spb6J8HAza6sxjxBXRjAPr7Q66TB/xxA81/oNHWu8WT2eiI/XoVGLCzRPHzreoOytjuqd+u4gqXQM8tpJkAyxXGUm8dzNGIWCgHCvgS7TkcE3eZlQV6oTtK2Q7miJhDk58vqOEiUOHLnkqsgRQ7cVoHA27M+Ng7KlstGbMq1N6cLsOgTTuoioz6Y1V3rVDz2p73kvaYeDlLJg+eApw== rsa-key-20180821
- You need the private key (either .pem file when using ssh or .ppk when using PuTTY)
- If using PuTTY and you only have the .pem, convert .pem to .ppk (Putty Private Key)
Got the contents of your public key, and the .pem (ssh) or .ppk (PuTTY) at hand? Cool. Let's move forward.
Spin up the development endpoint
Create by going to AWS Glue -> Dev endpoints -> Add endpoint and you should see this:Give it any name
Choose an IAM role that has e.g. the AWSGlueServiceRole policy attached.
I'm only using 2 DPUs, since that's the minimum and you get charged in proportion to them.
Click Next.
If your Glue tables are not in S3, then you need to also add the VPC & security group info. Easiest is to choose the connection attached to the table you will be using, and let Glue figure it out for you.
When choosing your subnet, consider your VPC settings: your Dev Endpoint will need a public address.
Click Next.
Since we are going to use a local Zeppelin Notebook server, then you will need to provide the public SSH key for the Dev Endpoint. This is needed for establishing the SSH tunnel to the Dev Endpoint.
Go ahead, review and launch. It often takes close to 10 minutes for my Dev Enpoint to provision.
SSH tunnel for Glue Dev Endpoint
When your dev endpoint is provisioned, check that its network interface has a public address attached to it and take note of it (e.g. ec2-xx-xxx-xxx-xx.us-west-2.compute.amazonaws.com). If it doesn't, you will need to create an elastic IP and attach it to the network interface of the dev endpoint.
create SSH tunnel, using PuTTY:
putty.exe -ssh glue@ec2-xx-xxx-xxx-xx.us-west-2.compute.amazonaws.com `
-L 9007:169.254.76.1:9007 `
-i your_certificate.ppk
using ssh:ssh glue@ec2-xx-xxx-xxx-xx.us-west-2.compute.amazonaws.com \
-NTL 9007:169.254.76.1:9007 \
-i ./your_certificate.pem
Zeppelin Notebook
Download Zeppelin the newest version might not work (check this page), but the 0.7.3 does.Unpack to a folder.
Launch Zeppelin under bin. This can take a few minutes. When ready, it will output:
Done, zeppelin server started
The original instructions are here, but I'm paraphrasing:
Go to localhost:8080
Top right, click anonymous -> interpreter. Search for spark. Click Edit.
Have Connect to existing process checked
set host to
localhost
and port to 9007
Under properties, set master to
yarn-client
Remove
spark.executor.memory
and spark.driver.memory
properties if they exist.Save, Ok.
Write your first Glue script with Dev Endpoint
Under Notebook, click + Create new note and copy-paste below code.
Replace your database and the table name with your own (The ones in your Glue data catalog).
Notice the way I am creating the glueContext. Also notice how there's no Job object or arguments.
%pyspark
import sys
from awsglue.transforms import *
from pyspark.context import SparkContext
from awsglue.context import GlueContext
glueContext = GlueContext(SparkContext.getOrCreate())
# Load data and print its schema
datasource = glueContext.create_dynamic_frame.from_catalog(
database = "yourGlueDatabase",
table_name = "yourGlueTable")
datasource.printSchema()
Shift + Enter and your code will be shipped off to your Glue Dev Endpoint and in a while, you'll see the schema of your table below the cell.
Voilà. 🎉
As per Mike's comment below, what if you'd like to use code that works on both the Dev Endpoint and the "managed" Glue? Solved: The Glue code that runs on AWS Glue and on Dev Endpoint
I'm curious what you recommendation is from once you've done your script development in a notebook to translating it to a glue job. As you noted it's not a straight 1-to-1 copy paste. I'm new to the field and I was kind of hoping to be able to point a glue job to a notebook itself but it doesn't appear to be that easy. Do you you just stub in variables during notebook development for job object and arguments and replace them later in glue once you're ready to "deploy to production"? Any advice for development work flow would be appreciated.
ReplyDeleteThe way I do it is pretty simple, not the best way yet.
DeleteI structure my code in such a manner that the top part after imports I just replace with.
so instead of glueContext = GlueContext(SparkContext.getOrCreate())
i put
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
and then the job.commit() to the end.
If you use the args in your code, you can easily create a similar dictionary in the 'header' of your development code. What changes is glueContext, the job and args. Other details stay the same.
Actually, I'll improve on this. I'll provide a better way to structure your code so that the same code can be run both in dev endpoint and managed glue. That'll be another blog post I think. I'll link it back here when I'm done.
Thanks for the idea Mike!
Actually Mike, I decided to solve the challenge straight away. It help my own work too. Here's what I did: The Glue code that runs on AWS Glue and on Dev Endpoint. Enjoy!
DeleteHi Ilkka Peltola,
ReplyDeleteNice Article! When I login to dev end point using SSH and open pyspark shell, everything is working fine but I am getting below warning for every few seconds. Could you please let me know if you have faced this and any idea how to suppress these warnings? The statements are working fine though but the warning message is annoying.
19/05/18 03:06:41 WARN ServletHandler: /api/v1/applications/application_
java.lang.NullPointerException
at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:388)
at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:341)
at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:228)
at org.spark_project.jetty.servlet.ServletHolder.handle(ServletHolder.java:845)
at org.spark_project.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1689)
at org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.doFilter(AmIpFilter.java:164)
at org.spark_project.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1676)
at org.spark_project.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:581)
at org.spark_project.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1180)
at org.spark_project.jetty.servlet.ServletHandler.doScope(ServletHandler.java:511)
at org.spark_project.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1112)
at org.spark_project.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
at org.spark_project.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:461)
at org.spark_project.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:213)
at org.spark_project.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:134)
at org.spark_project.jetty.server.Server.handle(Server.java:524)
at org.spark_project.jetty.server.HttpChannel.handle(HttpChannel.java:319)
at org.spark_project.jetty.server.HttpConnection.onFillable(HttpConnection.java:253)
at org.spark_project.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:273)
at org.spark_project.jetty.io.FillInterest.fillable(FillInterest.java:95)
at org.spark_project.jetty.io.SelectChannelEndPoint$2.run(SelectChannelEndPoint.java:93)
at org.spark_project.jetty.util.thread.strategy.ExecuteProduceConsume.executeProduceConsume(ExecuteProduceConsume.java:303)
at org.spark_project.jetty.util.thread.strategy.ExecuteProduceConsume.produceConsume(ExecuteProduceConsume.java:148)
at org.spark_project.jetty.util.thread.strategy.ExecuteProduceConsume.run(ExecuteProduceConsume.java:136)
at org.spark_project.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:671)
at org.spark_project.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:589)
at java.lang.Thread.run(Thread.java:748)
The statements are working, but you're getting this error? That's odd.
DeleteUnfortunately no, I have not encountered this, but the first place I would go and check are the spark interpreter settings on the Zeppelin notebook server.