はじめに

AWS Glueは、サーバーレスでスケーラブルなデータ統合サービスであり、ETL(Extract, Transform, Load)ジョブの実行に最適です。GlueはApache Sparkをバックエンドに持っており、Python APIであるPySparkを使ってデータを簡単に操作・加工できます。

この記事では、RDSRedshiftに保存されたデータをAWS Glueを使ってフィルタリングし、さらにカラム名を変更する方法を紹介します。

目次

  1. AWS Glueとは
  2. PySparkを使ったデータ加工のメリット
  3. RDSからのデータ処理例
  4. Redshiftからのデータ処理例
  5. まとめ

1. AWS Glueとは

AWS Glueは、ETLジョブを実行するために設計されたサーバーレスのサービスです。Glueは以下のような特徴を持っています。

Glueの主なコンポーネント


2. PySparkを使ったデータ加工のメリット

GlueのETLジョブでは、PySparkを使ってデータを扱います。PySparkを使うことのメリットは以下の通りです。


3. RDSからのデータ処理例

まず、RDS(例えばMySQLやPostgreSQL)からデータを取得して、フィルタリングとカラム名の変更を行う例を見てみましょう。

前提条件

サンプルコード

コードの解説

  1. データのフィルタリングdatasource.filter(f=lambda x: x["age"] > 50) では、age カラムが50以上のデータを抽出しています。
  2. カラム名の変更rename_field("old_name", "new_name") で、old_name というカラム名を new_name に変更しています。
  3. S3への書き込み:変換後のデータをS3に保存しています。

4. Redshiftからのデータ処理例

次に、Redshiftのテーブルからデータを取得して、同じようにフィルタリングとカラム名の変更を行う例を見てみましょう。

サンプルコード

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

# Redshiftからデータを取得
datasource = glueContext.create_dynamic_frame.from_options(
    connection_type="redshift",
    connection_options={
        "url": "jdbc:redshift://your-redshift-cluster:5439/yourdatabase",
        "user": "your-username",
        "password": "your-password",
        "dbtable": "your-redshift-table-name",
        "redshiftTmpDir": "s3://your-temp-dir/"
    }
)

# データの変換例(フィルタリングとカラム名変更)
transformed_data = datasource.filter(f=lambda x: x["sales"] > 1000).rename_field("old_name", "new_name")

# Redshiftにデータを書き込む
glueContext.write_dynamic_frame.from_options(
    frame=transformed_data,
    connection_type="redshift",
    connection_options={
        "url": "jdbc:redshift://your-redshift-cluster:5439/yourdatabase",
        "user": "your-username",
        "password": "your-password",
        "dbtable": "your-output-table",
        "redshiftTmpDir": "s3://your-temp-dir/"
    }
)

job.commit()

コードの解説

  1. データのフィルタリングdatasource.filter(f=lambda x: x["sales"] > 1000) では、sales カラムが1000を超える行のみを抽出しています。
  2. カラム名の変更rename_field("old_name", "new_name") でカラム名を変更しています。
  3. Redshiftへの書き込み:変換後のデータをRedshiftの別のテーブルに保存しています。

5. まとめ

AWS Glueを使ってPySparkでRDSやRedshiftのデータを処理するのは、非常に強力かつ効率的です。データのフィルタリングやカラム名変更などの操作は、PySparkのシンプルなAPIを使用して簡単に実装できます。

コメントを残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です