Recently I was asked to provide a quick, efficient and streamlined way of querying AWS CloudWatch Logs via the AWS Console. These logs were already being streamed to an AWS S3 bucket, and so I initially thought of simply interrogating the logs via AWS Insights. However, upon further investigation, I quickly saw some drawbacks to this option:

  1. AWS Insights has an output row limit of 10,000. This would cause problems in my case, as I was expecting certain reports to generate over 65,000 rows.

  2. The raw logs contained JSON fields, which would necessitate overly complicated queries to generate useful output.

  3. My solution should use standard SQL.

In the end, AWS Glue was chosen as a valid way to tackle the problem. Below is a step by step guide on the process.

What is AWS Glue?

AWS Glue is a fully managed extract, transform, and load (ETL) service that makes it easy for customers to prepare and load their data for analytics. You can create and run an ETL job with a few clicks in the AWS Management Console. You simply point AWS Glue to your data stored on AWS, and AWS Glue discovers your data and stores the associated metadata (table definition and schema) in the AWS Glue Data Catalogue. Once catalogued, your data is immediately searchable, can be queried, and available for ETL.

For more information specifically on AWS Glue, click here.

Pre-requisites

A summary of the upcoming steps is listed below:

  1. Create a database

  2. Using an AWS Crawler, generate a table to store the raw JSON data

  3. Define an ETL script, this will be used to re-structure the raw data

  4. Using a second AWS Crawler, create a Parquet formatted table

  5. The Parquet formatted table is now ready to be queried via AWS Athena

As part of the process, 2 S3 buckets were required. These are detailed here:

  1. An S3 bucket where the transform script and Parquet table are stored = s3://<glue-data-my-bucket>/

  2. A temporary location for AWS Glue config = s3://<glue-temp-my-bucket>/temp

  3. And a Role

The step by step process

From the AWS Console, advance to the AWS Glue console.

Within the Data Catalogue, create a database

Database = acl-sec-db

Now, using an AWS Glue Crawler, perform the following steps to create a table within the database to store the raw JSON log data.

Table – Add Table – Add tables using a crawler

Next

Crawler name = acl-sec-db-json-crawler

Next

Crawler source type = Data Stores

Next

Choose a data store = s3

Include path: s3://cwl-raw-data-s3-bucket

Next

Add another data store = No

Next

Associate a role, create one if required = AWSGlueServiceRole-sec-db

Next

Schedule for crawler

Frequency = Run on-demand (for production this value can be changed to hourly/daily as required)

Next

Database = acl-sec-db

Next

Review and select Finish.

Now that the Crawler has been created, select and run it.

Upon completion, 1 table will be created.

To view the schema details of this table, click on the name of the table itself.

NOTE:

  1. Field partition_0 will need to be renamed in the parquet formatted table (this field contained the YEAR)

  2. Field partition_1 will need to be renamed in the parquet formatted table (this field contained the MONTH)

  3. Field partition_2 will need to be renamed in the parquet formatted table (this field contained the DATE)

  4. Field partition_3 will need to be renamed in the parquet formatted table (this field contained the HOUR)

  5. Field subscriptionfilters is an array, this will need to be converted into a string

  6. Field logevents is an array containing 3 fields, 3 separate fields will be created in the parquet formatted table for these

  7. Field logevents will be removed

Parquet format table

Ultimately, the finished data will be kept in a Parquet formatted table. Due to the sheer amount of data, Parquet is an ideal format to use and provides several benefits:

  1. Better compression of data, resulting in space-saving

  2. I/O reduction

  3. Increased compression results in reduced bandwidth for reads

What is Parquet formatted data? Parquet is a free and open-source column-oriented data storage format of the Apache Hadoop ecosystem. It is similar to the other columnar-storage file formats available in Hadoop namely RCFile and ORC. It is compatible with most of the data processing frameworks in the Hadoop environment. It provides efficient data compression and encoding schemes with enhanced performance to handle complex data in bulk.

For more information about Apache Parquet click here or here.

Now to create a parquet formatted table.

To do this, an ETL job is defined. This job will transform the original JSON data into Parquet format. Then, a second AWS Glue Crawler is defined. This Crawler will create a second table.

From the AWS Glue console:

ETL – Jobs (from the left-hand pane)

Click the Add Job button

Name = acl-sec-db-parquet-job

IAM role = AWSGlueServiceRole-sec-db (pick the Service role created earlier)

This job runs = A proposed script generated by AWS Glue

Script file name = acl-sec-db-parquet

S3 path where the script is stored = s3://<glue-data-my-bucket>/script

Temporary directory = s3://<glue-temp-my-bucket>/temp

Next

Select the data store (the JSON formatted table):

Next

Choose a transform type = Change Schema

Next

Choose a data target

Select ‘Create tables in your data target’

Data store = Amazon S3

Format = Parquet

Target path = s3://<glue-data-my-bucket>/parquet_results

The name of the subsequent table will be parquet_results.

At this point, the following is displayed:

Take this opportunity to make some changes.

Within the target partition the following changes are required:

  • Rename partition_0 to theyear

  • Rename partition_1 to themonth

  • Rename partition_2 to thedate

  • Rename partition_3 to thehour

  • For subscriptionfilters, select the data type array and change it to string

  • Select – Save job and edit script

At this point a default script is provided, I replaced this default script with one that was more in-line with my needs.

My script does the following:

  1. Renames the date fields

  2. Concatenates 3 date fields into one field of data type ‘date’

  3. Creates 3 separate fields from the logevents array

  4. Removes the logevents field, which is no longer required.

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import explode, concat, to_date, col, lit
from awsglue.dynamicframe import DynamicFrame

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, [‘JOB_NAME’])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args[‘JOB_NAME’], args)
## @type: DataSource
## @args: [database = “acl-sec-db”, table_name = “cwl-raw-data-s3-bucket”, transformation_ctx = “datasource0”]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = “acl-sec-db”, table_name = ” cwl-raw-data-s3-bucket”, transformation_ctx = “datasource0”)
## @type: ApplyMapping
## @args: [mapping = [(“messagetype”, “string”, “messagetype”, “string”), (“owner”, “string”, “owner”, “string”), (“loggroup”, “string”, “loggroup”, “string”), (“logstream”, “string”, “logstream”, “string”), (“subscriptionfilters”, “array”, “subscriptionfilters”, “string”), (“logevents”, “array”, “logevents”, “array”), (“partition_0”, “string”, “partition_0”, “string”), (“partition_1”, “string”, “partition_1”, “string”), (“partition_2”, “string”, “partition_2”, “string”), (“partition_3”, “string”, “partition_3”, “string”)], transformation_ctx = “applymapping1”]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [(“messagetype”, “string”, “messagetype”, “string”), (“owner”, “string”, “owner”, “string”), (“loggroup”, “string”, “loggroup”, “string”), (“logstream”, “string”, “logstream”, “string”), (“subscriptionfilters”, “array”, “subscriptionfilters”, “string”), (“logevents”, “array”, “logevents”, “array”), (“partition_0”, “string”, “theyear”, “string”), (“partition_1”, “string”, “themonth”, “string”), (“partition_2”, “string”, “thedate”, “string”), (“partition_3”, “string”, “thehour”, “string”)], transformation_ctx = “applymapping1”)
## @type: ResolveChoice
## @args: [choice = “make_struct”, transformation_ctx = “resolvechoice2”]
## @return: resolvechoice2
## @inputs: [frame = applymapping1]
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = “make_struct”, transformation_ctx = “resolvechoice2”)
## @type: DropNullFields
## @args: [transformation_ctx = “dropnullfields3”]
## @return: dropnullfields3
## @inputs: [frame = resolvechoice2]
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = “dropnullfields3”)
df = dropnullfields3.toDF()
df1 = df.select(‘*’,explode(‘logevents’).alias(‘logevents1’)).select(‘*’,’logevents1.*’).drop(‘logevents1′,’logevents’)
df2=df1.withColumn(“fulldate”, to_date(concat(col(‘thedate’), lit(‘/’), col(‘themonth’), lit(‘/’), col(‘theyear’)), ‘dd/MM/yyyy’))
#df2.select(‘fulldate’).printSchema()
#df2.select(‘fulldate’).show(5)
dynf = DynamicFrame.fromDF(df2, glueContext, “dynf”)
## @type: DataSink
## @args: [connection_type = “s3”, connection_options = {“path”: “s3://glue-data-my-bucket/parquet_results”}, format = “parquet”, transformation_ctx = “datasink4”]
## @return: datasink4
## @inputs: [frame = dropnullfields3]
datasink4 = glueContext.write_dynamic_frame.from_options(frame = dynf, connection_type = “s3”, connection_options = {“path”: “s3://glue-data-my-bucket/parquet_results”}, format = “parquet”, transformation_ctx = “datasink4”)
job.commit()

Save

Run job / Run job

Once the job has completed, a status of success will be displayed:

Add the Crawler and create Parquet table

Add the Crawler

Crawler name = acl-sec-db-parquet-crawler

Next

Crawler source type = Data Stores

Next

Choose a data Store = S3

Select ‘Specified path in my account’

Include path = s3://glue-data-my-bucket/parquet-results

Next

Next

Choose an existing IAM role = AWSGlueServiceRole-sec-db

Next

Frequency = Run on-demand (for production this value can be changed to hourly/daily as required)

Next

Configure the crawler’s output

Database = acl-sec-db

Next

Finish

Run the Crawler

Upon a successful Crawler run, a second table will be available:

Table parquet-results has the following schema:

The above table can now be queried via AWS Athena.

Conclusion

We have taken an incoming stream of AWS CloudWatch logs, which in its original arrangement contained data formatted using JSON and was comprised of various datatypes. This was converted into a more homogeneous and efficient structure which is more suitable for our needs. We have also renamed certain fields, split-up, and concatenated others, thus making the data more meaningful and easier to work with.

At this point, the data within the parquet-results table can be probed using SQL queries using AWS Athena.

e.g.

SELECT COUNT(*) FROM acl-sec-db.parquet-results;

or;

SELECT * FROM acl-sec-db.parquet-results LIMIT 10;

IMPORTANT: When issuing a SELECT query, especially on a large dataset, remember to include the LIMIT keyword.

For more information on AWS Athena, click here.

Pricing

AWS Glue pricing guide

AWS Athena pricing