はじめに

このチュートリアルでは、MySQLのデータソースとAzure Databricksを対象に、Qlik ReplicateとQlik Compose for Data Lakesを利用してリアルタイムにデータを分析するためのData Lakeの設計・構築の手順をご紹介します。

【検証環境】

  • Qlik Replicate 6.6.0 SR2 on Windows
  • Qlik Compose for Data Lakes 6.6.0 SR3 on Windows
  • MySQL Community Edition 5.7.18 on Windows
  • Azure Databricks (14日間無料体験版)

※Azureアカウント、およびAzure Databricks Serviceアカウントがある前提で本チュートリアルは記述されています。
※Databricksで使用するクラスターやストレージについてはAzureの利用料金が発生します。実機検証の際は検証後に環境のクリーンアップをしていただくなど、コストの発生・増加には十分にご注意ください。

準備

Qlik Replicate–Databricksターゲットの利用の手順に従って、Replicateを使ったDatabricksへのレプリケート設定作業が完了している必要があります。ここでは、Qlik Compose for Data Lakesを使って、Replicateにより転送されたAzure Databricksのlandingゾーンのデータから分析用データを作成します。

Qlik ComposeとAzure Databricksの設定

JDBCドライバの設定

以下のURLからJDBCドライバをダウンロードし、Qlik Compose for Data Lakesがインストールされているマシンの”C:\Program Files\Attunity\Compose for Data Lakes\java\jdbc”ディレクトリにダウンロードしたZIPファイルに含まれるSparkJDBC41.jarファイルを設置しておきます。

https://databricks.com/wp-content/uploads/2.6.11.1014/SimbaSparkJDBC-2.6.11.1014.zip

※Databricks社のODBC/JDBCダウンロードサイトはJDBC and ODBC drivers and configuration parametersとなりますが、2020年10月時点では旧バージョンのドライバーを利用するため、上記サイトからのダウンロードを行っています。

ADLS Gen2ストレージへのStorageゾーンの作成

Landing領域にはQlik Replicateでターゲットとして指定したADLS Gen2の「replicate」コンテナを指定します。Storage領域には、同じADLS Gen2 Storage Accountに「compose」コンテナを新たに作成し、そちらを指定します。
Qlik Replicate–Databricksターゲットの利用でADLS Gen2のストレージにreplicateコンテナを作成した手順と同様に、Storage Explorerを使ってcomposeコンテナを作成し、その配下に「storage」ディレクトリを作成します。
Azure Databricksポータルに移動し、Qlik Replicate–Databricksターゲットの利用で作成したNotebookに新たにセルを追加し、以下のコードを実行します。これにより、composeコンテナをDatabricksにマウントします。
※<~~~~>の文字列は適宜これまでに取得した値に変更してください。
※前出のReplicateの設定時と比較して、以下のパラメータを変更する必要があります。
 <file-system-name>および<mount-name>→「compose」
 <directory-name>→「storage」
%python
configs = {"fs.azure.account.auth.type": "OAuth",
"fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
"fs.azure.account.oauth2.client.id": "<application-id>",
"fs.azure.account.oauth2.client.secret": "<client-secret>",
"fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/<directory-id>/oauth2/token"}

# Optionally, you can add <directory-name> to the source URI of your mount point.
dbutils.fs.mount(
source = "abfss://<file-system-name>@<storage-account-name>.dfs.core.windows.net/<directory-name>",
mount_point = "/mnt/<mount-name>",
extra_configs = configs)
もう一つ新規のセルを追加し、以下のSQL文を実行します。「compose」というDBを作成しています。
※<~~~~>の文字列は適宜変更してください。
%sql
drop database if exists  <database-name>;
create database <database-name> location '<mount-point>';
<database-name>:  任意のDB名を付与します。ここでは「compose」とします。
<mount-point>: 一つ前の節で設定した「mount_point」の値を貼り付けます。ここでは「/mnt/compose」です。
Azure Databricksポータル左側メニューの[Data]タブから、composeというDBが作成されたことを確認します。

Replicateタスクの設定変更

Qlik Replicateのコンソールに移動し、MySQL2Databricksのタスク画面で[Task Settings]を開きます。

[Change Processing]配下の[Store Changes Settings]を開き、以下の設定を行います。

  • [Change Data Partitioning]の[Partition every:]にチェックする。
  • [Partition every:]の間隔を[1 Minute]にする。
  • [Partition Retention]にチェックする。
  • [delete partitions every:]の間隔を10 Hoursにする。
image

[Metadata]配下の[Control Tables]で、[Change Data Partitions]の[Enable]にチェックが入っていることを確認して[OK]をクリックします。

[Save]を選択してタスク設定の変更を保存します。その際、タスクを一度停止する必要がある旨の画面が表示されますが、[Stop and Save]を選択してください。
設定を変更したためターゲットフォルダのテーブルを一度Dropして、Full Loadからやり直します。[Run]横のプルダウンを表示し、[Reload Target]を選択します。確認画面がでますので[Yes]を選択します。

Composeプロジェクトの作成

Qlik Compose for Data Lakesのコンソールを開き、[Add New Project]を選択します。
以下を参考に設定し、[Create New Project]を選択します。
Name : Azure Databricks tutorial
Type : Databricks
[Project Type]に[Operational Data Store]を選択し、下の[Mark the matching Storage record as deleted]をチェックし、[OK]を選択します。

LandingとStorageへの接続の作成

Qlik Compose for Data Lakesの画面で、[LANDING AND STORAGE CONNECTIONS]領域の[Connections…]を選択します。
image
[Add New Storage]から、画像を参考にStorageの接続定義を作成します。
  • Name: 接続定義の名称を入力します。
  • Password : Databricksクラスターの画面で作成したTokenを使用します。
  • HTTP Path : DatabricksクラスターのODBC/JDBCタブで [HTTP path]を確認できます。
  • Host (HiveServer2) : DatabricksクラスターのODBC/JDBCタブで[Host Server]を確認できます。
  • [Database name:]の右の[…]をクリックし、先ほど設定した[compose]データベースを選択します。
  • [Test Connection]で接続テストを行い、問題なければ[OK]を選択します。
続いて、[+New…]→[Landing Connections from Repliocate Task]を選択して、Landingゾーンの接続設定を行います。
[Manage Replicate Servers…]を選択します。
[Add Replicate Server…]を選択します。
Qlik Replicateのサーバー情報を入力しテスト接続に問題がなければ[OK]を選択します。
Qlik Replicateのサーバーが登録されていることを確認したら、[Close]を選択します。
[Select Task…]を選択します。
登録したServer名を選択し、Replicate Tasksで紐づけたいタスクを選択し、[OK]を選択します。
[Data Landing Name:]を「Sales_landing」として[OK]を選択し、[Close]をクリックしてプロジェクトのホーム画面に戻ります。
これで、StorageとLandingの接続が完了し、Qlik ComposeとDatabricksを使って分析用データを作成する準備が整いました。

LandingゾーンからStorageゾーンへデータを移動

ここまでQlik Replicateを使ってAzure Databricksにデータをストリーム配信しました。次のフェーズは、データレイクのセットアップ、継続的な運用・管理です。典型的なデータレイクでは、初期導入の時間と労力のほとんどが、サードパーティのツールで使用するためのデータをキュレートし、高度な分析を実行するためのETL(Extract-Transform-Load)コードを書くことに費やされます。Qlik Compose for Data Lakesは、データレイクの構築と維持に関する時間のかかる作業を自動化します。メタデータ駆動型のアプローチにより、必要なETLスクリプトを生成し、データレイクのライフサイクル全体を自動的に管理します。その結果、ビジネスアナリストやデータアーキテクトは、データから洞察を得るまでの時間を短縮し、生産性を大幅に向上させることができます。

Storage Zoneの[Metadata]を選択します。

[Discover]を選択します。

[Search]を選択しLanding Zoneのテーブルを検索します。左下に検索結果のテーブル群が表示されますので、Storage Zoneに移動したいテーブルを一つずつ選択し[>]を押して右側に移動します。ここでは、「categories」、「customers」、「order_details」、「orders」、「products」、「shippers」、「suppliers」の7テーブルを移動します(「customer_alt_contact」、「employee」の2テーブルを除いています)。

テーブルの選択が完了したら[OK]を選択します。

メタデータ生成が自動的に始まります。作成が完了したのちに進捗状況が表示されている画面右下の[Close]を選択して閉じます。

メタデータを確認できるようになりました。

[order_details]エンティティを選択し、[+New Attribute]を選択します。

下の画像を参考にExpected UnitsInStockというAttributeをorder_detailエンティティに作成します。

[Validate]を選択します。指定したデータ変換内容によってStorage Zoneのデータに問題が起きないことをQlik Composeが検証します。検証に問題がなければ[Close]を押してValidateの画面を閉じます。

Storage Zoneの[Data Storage Tasks]を選択します。

[Sales_Landing]の[Map_order_details_Sales_landing]をダブルクリックします。

Landing領域とStorage領域のテーブルマッピングが表示されます。ほとんどの属性は自動的にマッピングされますが、新規に作成したExpectedUnitsInStockにはマップ定義がありません。ここでは、productsエンティティのUnitsInStock属性をLookUpします。ExpectedUnitsInStockにマウスオーバーすると表示されるLookUpアイコンをクリックします。

参照テーブルを選択します。ここでは[products]を指定し、[OK]を選択します。

[Condition]横の[Create Expression]を選択します。

こちらでLookUp条件を指定します。[Build Expression]に以下の数式を貼り付けます。ここではProductIDがイコールの値を参照するようにしています。必要に応じて[Parse Expression]で数式を検証し、問題がなければ[OK]を選択します。

${Lookup.ProductID}=${Landing.ProductID}

続いて[Result Column]横の[Create Expression]を選択します。

今度は、LookUpテーブル(productsテーブル)のどの値を参照するかを指定します。ここではUnitsInStockを指定したいので以下の数式を[Build Expression]に貼り付けます。問題がなければ[OK]を選択します。

${Lookup.UnitsInStock}

こちらがLookUpの設定結果です。[Preview Results]から結果を事前確認することができます。問題がなければ[OK]を選択します。

マッピングを確認したら[OK]を選択し、[Managed Data Storage Tasks]エリアに戻ります。

さらに右下の[Close]を選択し、プロジェクトのメイン画面に戻ります。[Create]を選択しテーブルを作成します。

[The Storage Zone tables were created successfully.]というメッセージを確認したら、[Close]を選択します。

先ほどの[Create]ボタンが[Validate]ボタンに変わります。必須の手順ではないですが、こちらの[Validate]を選択するとマッピングとストレージの同期状態を確認することができます。

再び[Data Storage Tasks]を選択し、[Manage Data Storage Tasks]の画面で[Generate]を選択します。初期フルロードのETLセットが生成されます。

続いて[Sales_landing_CDC]を選択し、[Generate]を選択します。変更分(CDC)のETLセットが生成されます。

続いてこの画面上で[Run]ボタンをクリックするとETLセットを実行することができますが、ここでは実行せず次節でモニター画面で実行を行います。[Close]をクリックしてプロジェクトのホーム画面に戻ります。

モニタービューからタスクを確認・実行

プロジェクトのメイン画面に戻り、右上の[Monitor]を選択します。

[Sales_landing (Type: Stoarge Full Load)]を選択して、[Run]を選択します。初期フルロードが開始されます。

画面下部で実行進捗状況をモニターできます。

完了後、Azure Databricksポータルの[Data]タブを確認すると、composeデータベースに各種テーブルが作成されていることが分かります。
image
[order_details]テーブルを選択します。データがロードされていること、また、追加で作成した「ExpectedUnitsInStock」属性が作成されていることが確認できました。

変更データ捕捉(CDC)の確認

ここで、ソースのMySQLにUPDATE文を実行します。Qlik ReplicateとQlik Composeが連携して、Databricksのデータに変更が反映されることを確認します。

MySQLのsalesスキーマのcustomersテーブルをSELECT文で確認すると、CustomerIDが’ALFKI’となっているレコードのContactNameをSQLのUPDATE文で名前を変更し(ここでは’Sho Nakajima’に変更)、[Apply]をクリックして適用します。

image

Qlik Replicateに移動しタスクのモニター画面で[Change Processing]タブを確認すると、sales.customersテーブルのUpdate文を捕捉したことが確認いただけます。

image

Azure Databricksポータルの[Data]タブから、replicateデータベースのcustomers__ct(変更履歴を保存するchange table )を確認します。変更前後のレコードが記録されていることが分かります。

image

今度はQlik Composeのモニター画面に移動し、Sales_landing_CDCのタスクを実行します。※実運用ではSchedule実行しますが、ここでは手動で実行します。

タスクが完了したら、Azure DatabricksポータルのNotebookから以下のような形でSelect文のクエリを実行し、composeデータベースのcustomersテーブルを確認すると、データが更新されたことを確認します。

%sql
select * from compose.customers where CustomerId = 'ALFKI';

image

以上のような形で、Qlikデータ統合ソリューションとAzure Databricksを組み合わせてリアルタイムかつアジャイルなデータレイクを構築いただけます。