はじめに
AWS Glueは、サーバーレスでスケーラブルなデータ統合サービスであり、ETL(Extract, Transform, Load)ジョブの実行に最適です。GlueはApache Sparkをバックエンドに持っており、Python APIであるPySparkを使ってデータを簡単に操作・加工できます。
この記事では、RDSやRedshiftに保存されたデータをAWS Glueを使ってフィルタリングし、さらにカラム名を変更する方法を紹介します。
目次
- AWS Glueとは
- PySparkを使ったデータ加工のメリット
- RDSからのデータ処理例
- Redshiftからのデータ処理例
- まとめ
1. AWS Glueとは
AWS Glueは、ETLジョブを実行するために設計されたサーバーレスのサービスです。Glueは以下のような特徴を持っています。
- サーバーレス:インフラを管理する必要がありません。
- スケーラブル:大量のデータでもスケーラブルに処理が可能です。
- Apache Sparkをベース:GlueはSparkエンジンを使用してETL処理を実行するため、PySparkを使ってデータ操作ができます。
Glueの主なコンポーネント
- Glue Crawler:データソースをスキャンし、スキーマを自動で検出してGlueデータカタログに登録します。
- Glue ETLジョブ:PythonやScalaを使ってデータを変換・処理するためのジョブです。
2. PySparkを使ったデータ加工のメリット
GlueのETLジョブでは、PySparkを使ってデータを扱います。PySparkを使うことのメリットは以下の通りです。
- スケーラビリティ:大規模なデータを分散処理で効率的に操作可能。
- 簡単なデータ操作:豊富なライブラリを使ってデータのフィルタリング、結合、集計を行える。
- AWSサービスとの統合:S3、RDS、Redshiftなどとシームレスに連携可能。
3. RDSからのデータ処理例
まず、RDS(例えばMySQLやPostgreSQL)からデータを取得して、フィルタリングとカラム名の変更を行う例を見てみましょう。
前提条件
- RDSインスタンスがVPC内にあること。
- AWS GlueジョブがそのVPCにアクセスできる設定がされていること(サブネット、セキュリティグループの設定)。
サンプルコード
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
# RDSからデータを取得
datasource = glueContext.create_dynamic_frame.from_catalog(
database="your-rds-database-name",
table_name="your-rds-table-name"
)
# データの変換例(例えば、フィルタリングとカラム名変更)
transformed_data = datasource.filter(f=lambda x: x["age"] > 50).rename_field("old_name", "new_name")
# 加工後のデータをS3に書き込む
glueContext.write_dynamic_frame.from_options(
frame=transformed_data,
connection_type="s3",
connection_options={"path": "s3://your-bucket/output/"},
format="parquet"
)
job.commit()
コードの解説
- データのフィルタリング:
datasource.filter(f=lambda x: x["age"] > 50)
では、age
カラムが50以上のデータを抽出しています。 - カラム名の変更:
rename_field("old_name", "new_name")
で、old_name
というカラム名をnew_name
に変更しています。 - S3への書き込み:変換後のデータをS3に保存しています。
4. Redshiftからのデータ処理例
次に、Redshiftのテーブルからデータを取得して、同じようにフィルタリングとカラム名の変更を行う例を見てみましょう。
サンプルコード
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
# Redshiftからデータを取得
datasource = glueContext.create_dynamic_frame.from_options(
connection_type="redshift",
connection_options={
"url": "jdbc:redshift://your-redshift-cluster:5439/yourdatabase",
"user": "your-username",
"password": "your-password",
"dbtable": "your-redshift-table-name",
"redshiftTmpDir": "s3://your-temp-dir/"
}
)
# データの変換例(フィルタリングとカラム名変更)
transformed_data = datasource.filter(f=lambda x: x["sales"] > 1000).rename_field("old_name", "new_name")
# Redshiftにデータを書き込む
glueContext.write_dynamic_frame.from_options(
frame=transformed_data,
connection_type="redshift",
connection_options={
"url": "jdbc:redshift://your-redshift-cluster:5439/yourdatabase",
"user": "your-username",
"password": "your-password",
"dbtable": "your-output-table",
"redshiftTmpDir": "s3://your-temp-dir/"
}
)
job.commit()
コードの解説
- データのフィルタリング:
datasource.filter(f=lambda x: x["sales"] > 1000)
では、sales
カラムが1000を超える行のみを抽出しています。 - カラム名の変更:
rename_field("old_name", "new_name")
でカラム名を変更しています。 - Redshiftへの書き込み:変換後のデータをRedshiftの別のテーブルに保存しています。
5. まとめ
AWS Glueを使ってPySparkでRDSやRedshiftのデータを処理するのは、非常に強力かつ効率的です。データのフィルタリングやカラム名変更などの操作は、PySparkのシンプルなAPIを使用して簡単に実装できます。
- RDSやRedshiftとシームレスに統合されており、大規模なデータでもスケーラブルに処理可能。
- サーバーレスな環境でETLジョブを実行できるため、インフラ管理の負担が軽減されます。