| von Sakar Gurubacharya

Von API zu S3 Data Lake: ETL-Lösungen in AWS

Daten sind überall in den Unternehmen zu finden. Die effiziente Nutzung dieser Daten ist wichtiger denn je. Für diejenigen, die in den Bereichen Datenanalyse und Statistik arbeiten, sei es im Sport, im Finanzwesen oder im Marketing, ist das Sammeln, Verarbeiten und Speichern von Daten oft mühsam und zeitaufwändig. Glücklicherweise ist die Verwaltung großer Datenmengen mit der Entwicklung von Cloud-Plattformen wie AWS [1] leichter zugänglich und effizienter geworden. In diesem Artikel erfahren Sie, wie wir AWS-Dienste nutzen können, um statistische Daten aus einer externen API reibungslos zu extrahieren, in ein brauchbares Format umzuwandeln und mithilfe von ETL-Prozessen (Extract, Transform, Load) in einen AWS S3 Data Lake zu laden.
Der Prozess nutzt Amazon Web Services (AWS) Tools wie S3, Boto3 und Glue, um zu zeigen, wie man eine robuste und skalierbare ETL-Pipeline erstellt.

Zielsetzung: Datentechnik in der Cloud

Das Hauptziel besteht darin, eine Cloud-basierte Datenpipeline-Plattform zu verstehen und zu implementieren, die Daten für die Analyse effizient aufnimmt, umwandelt und speichert. Dieser Ansatz ermöglicht es Unternehmen, die Skalierbarkeit und Flexibilität von Cloud-Diensten zu nutzen und gleichzeitig große Datensätze nahtlos zu verwalten. Eines der leistungsfähigsten Tools in diesem Prozess ist die Möglichkeit, statistische Daten von einem externen API-Anbieter in einen S3-Bucket aufzunehmen. APIs (Application Programming Interfaces) ermöglichen uns den Zugriff auf strukturierte Daten aus verschiedenen Quellen - seien es Finanzmärkte, Kundeninteraktionen oder Betriebskennzahlen.

APIs sind das Rückgrat des modernen Datenaustauschs und bieten strukturierten Zugang zu einer Fülle von Informationen. API-Daten müssen jedoch häufig transformiert und angereichert werden, bevor sie effektiv für Analysen genutzt werden können. Gleichzeitig bieten Cloud-basierte Data Lakes, insbesondere solche, die auf Amazon S3 aufbauen, eine kostengünstige und skalierbare Speicherung großer Datenmengen. Die Überbrückung der Lücke zwischen APIs und S3 erfordert einen gut definierten ETL-Prozess.

Sobald die Daten aufgenommen und in Amazon S3 gespeichert wurden, sind sie bereit für Abfragen und Analysen. Durch die Nutzung von AWS-Services wie Athena für serverlose Abfragen und QuickSight für die Visualisierung können Sie Rohdaten schnell in wertvolle Erkenntnisse umwandeln. In den folgenden Abschnitten zeige ich Ihnen, wie Sie eine ETL-Pipeline implementieren, die Daten aus externen APIs nahtlos in Ihren Cloud Data Lake integriert und so die Grundlage für eine leistungsstarke, datengesteuerte Entscheidungsfindung schafft.

Überblick über den ETL-Prozess in AWS

Um das Konzept zu verdeutlichen, beginnen wir mit einem Überblick über die Architektur auf hoher Ebene.
ETL on the Cloud
Abbildung 1: Übersicht über die ETL in der Cloud

Der ETL-Prozess beginnt mit der Interaktion einer externen API, um Rohdaten zu extrahieren, normalerweise im JSON- oder XML-Format. Die API-Authentifizierungsdaten können mit AWS Secrets Manager verwaltet werden, um einen sicheren und kontrollierten Zugriff zu gewährleisten. Python-Skripte handhaben die Extraktionslogik, führen geplante API-Aufrufe durch, verwalten Ratenbegrenzungen und protokollieren Antworten zur Überwachung und Fehlersuche. Diese Skripte werden zur Versionskontrolle in GitHub gespeichert, was eine teamübergreifende Zusammenarbeit und Versionsverfolgung ermöglicht.

Um die Bereitstellung der Infrastruktur zu automatisieren, wird Terraform zur Definition und Bereitstellung von AWS-Ressourcen wie S3-Buckets, IAM-Rollen und Compute-Instances verwendet. Die extrahierten Rohdaten werden dann in einen AWS S3-Bucket aufgenommen, der als Data Lake, d. h. als zentrales Speicherrepository, fungiert. AWS-Services wie Glue oder Lambda können die Daten weiterverarbeiten und in strukturierte, für die Analyse optimierte Formate umwandeln. [2]

Diese Architektur gewährleistet Skalierbarkeit, Automatisierung und Sicherheit und macht die Datenübernahme aus APIs nahtlos und effizient. Durch die Nutzung von AWS-Cloud-Services kann die ETL-Pipeline umfangreiche Dateneingaben mit minimalem betrieblichem Aufwand verarbeiten. [3]

Mit diesem Überblick im Hinterkopf wollen wir uns nun die einzelnen Phasen des ETL-Prozesses genauer ansehen.

E = Extraktion: API-Aufrufe durchführen

Der erste Schritt des Prozesses ist die Datenextraktion, bei der die Rohdaten von einer externen API abgerufen werden. Zur Erleichterung dieses Vorgangs verwenden wir eine bereits vorhandene oder eine benutzerdefinierte, in Python geschriebene API-Client-Klasse, die die Logik für die Verwaltung der Interaktion mit der externen API kapselt. Diese Klasse ist für die Handhabung der OAuth- oder API-Schlüssel-basierten Authentifizierung, die sichere Verwaltung von Token oder Anmeldeinformationen und die Konstruktion dynamischer API-Anforderungs-URLs auf der Grundlage vordefinierter Endpunktmuster verantwortlich. Die URL-Struktur ist parametrisiert, um flexible Abfragen zu ermöglichen, die auf erforderlichen Datenfeldern wie Datumsbereichen oder bestimmten Datentypen basieren. Jeder API-Anbieter verfügt über einen eigenen Satz von Anmeldeinformationen und Endpunktkonfigurationen, die über Umgebungsvariablen oder Konfigurationsdateien in die Client-Klasse eingefügt werden. Sobald der Client ordnungsgemäß konfiguriert ist, wird eine GET-Anfrage unter Verwendung der Anfragebibliothek ausgeführt, um die relevanten Rohdaten, in der Regel im JSON- oder XML-Format, abzurufen. [4] Die Antwort wird dann geparst und validiert, bevor sie zur weiteren Verarbeitung an die ETL-Pipeline weitergeleitet wird.

[GET] /api/v1/data/endpoint?parameter1=value1&parameter2=value2

Abbildung 2: Ein Beispiel für einen GET-Endpunkt

In Abbildung 2 wird die GET-Anforderung an den Endpunkt /api/v1/data/endpoint gestellt, wobei parameter1 und parameter2 Abfrageparameter sind, die die angeforderten spezifischen Daten oder Filter definieren. Die API-Klasse konstruiert die vollständige URL dynamisch auf der Grundlage der Eingabeparameter und stellt die Anforderung. Die Antwort der API wird normalerweise im JSON-Format zurückgegeben und enthält die angeforderten Daten.

Bei der Arbeit mit APIs kann es aufgrund von Netzwerkproblemen, Authentifizierungsfehlern, Ratenbeschränkungen oder unerwarteten Datenformaten zu Fehlern kommen. Um eine stabile ETL-Pipeline zu gewährleisten, ist es wichtig, Wiederholungsmechanismen (z.B. exponentielles Backoff für fehlgeschlagene Anforderungen), Fehlerprotokollierung für die Fehlersuche und eine sichere Verwaltung von Anmeldeinformationen für die Behandlung von Authentifizierungsfehlern zu implementieren. Die Verwendung von AWS CloudWatch zur Protokollierung und Überwachung hilft bei der Verfolgung von API-Fehlern, während die sichere Speicherung von API-Anmeldeinformationen in AWS Secrets Manager Sicherheitsrisiken verhindert. Durch die Einbeziehung dieser Best Practices stellen wir sicher, dass die Datenextraktion auch bei API-Inkonsistenzen stabil und zuverlässig bleibt.

T = Transformation: Strukturierung und Aufbereitung von Daten für die Analyse

Sobald die Daten abgerufen sind, ist der nächste entscheidende Schritt - die Datenumwandlung. APIs geben die Daten in der Regel in komplexen, tief verschachtelten JSON-Formaten zurück, die für eine direkte Analyse oder Abfrage nicht ideal sind. Unser Ziel ist es hier, die JSON-Rohdaten zu normalisieren, zu bereinigen und zu filtern und sie in eine vereinfachte, tabellarische Struktur umzuwandeln, damit sie leichter in AWS S3 gespeichert und später mit Amazon Athena abgefragt werden können.

Um dies zu erreichen, verwenden wir Datenverarbeitungs-Frameworks wie Pandas, Spark DataFrame und Polars, der robusten Tools für die Arbeit mit strukturierten Daten bieten. Bei relativ einfachen und gut strukturierten JSON-Dateien helfen Funktionen wie json_normalize() in Pandas oder entsprechende Methoden in Spark und Polars dabei, verschachtelte JSON-Objekte rekursiv zu reduzieren und in einen zweidimensionalen DataFrame zu konvertieren, bei dem jedes verschachtelte Feld in eine eigene Spalte expandiert wird. Diese Transformation vereinfacht nicht nur die Daten, sondern stellt auch sicher, dass die resultierende Struktur für effiziente Abfragen in Systemen wie Athena optimiert ist. Durch die Konvertierung der Daten in ein Tabellenformat können wir sie schnell in S3 als Parquet- oder CSV-Dateien zur langfristigen Speicherung und weiteren Analyse laden.

Wenn der API-Aufruf jedoch eine recht komplexe JSON-Struktur zurückgibt, sind zusätzliche Schritte erforderlich, um sie zu normalisieren. Tief verschachtelte Felder, Nullwerte und Arrays von Objekten erfordern eine weitere Verarbeitung, um sicherzustellen, dass alle relevanten Informationen in einem flachen Tabellenformat dargestellt werden.

{
    "players": [
        {
            "id": 101,
            "name": "John Doe",
            "team": {
                "id": null,
                "name": "Team A",
                "coach": {
                    "name": "Jane Smith",
                    "experience_years": null
                }
            },
            "stats": {
                "season": "2023",
                "matches": [
                    {
                        "id": 1001,
                        "date": "2023-09-01",
                        "opponent": "Team B",
                        "performance": {
                            "goals": 2,
                            "assists": 1,
                            "minutes_played": 90,
                            "cards": {
                                "yellow": 1,
                                "red": 0
                            }
                        }
                    }
                ]
            }
        }
    ]
}

Abbildung 3: Ein Beispiel für eine komplexe JSON-Struktur

In diesem Fall müssen wir verschachtelte Felder wie „team“ und „stats“ glätten und Arrays wie „matches“ extrahieren. Dazu verwenden wir json_normalize von Pandas mit spezifischen Parametern, um mit verschachtelten Strukturen und Listen umzugehen.

df = pd.json_normalize(
	data['players'],
	record_path=['stats', 'matches'],
	meta=[
		['id', 'name', 'team.id'],
		['team', 'name'],
		['team', 'coach', 'name'],
		['team', 'coach', 'experience_years'],
		['stats', 'season']
	],
	meta_prefix='player_',
	record_prefix='match_'
)

Figure 4: Normalization process for complex JSON

Wir übergeben den Pfad zum verschachtelten Array „matches“ mit dem Parameter record_path und definieren Metadatenfelder wie Spieler-ID, Name, Mannschaftsdetails und Statistiken mit dem Parameter meta. Außerdem verwenden wir meta_prefix und record_prefix, um sicherzustellen, dass die Spalten richtig beschriftet sind und zwischen Daten auf Spieler- und Spielebene unterschieden wird.

Dennoch ist die Verflachung allein nicht immer ausreichend. In Fällen, in denen Datenfelder auf unterschiedlichen Hierarchieebenen existieren oder anhand bestimmter Kriterien zugewiesen werden müssen, ist eine fortschrittlichere Transformation - das Datenmapping - erforderlich. Dadurch wird sichergestellt, dass die Schlüsselattribute richtig strukturiert sind, auch wenn sie in der API-Rohantwort nicht perfekt übereinstimmen.

Bei komplexen Transformationen enthalten rohe API-Antworten oft tief verschachtelte Strukturen mit Schlüsselinformationen, die sich über mehrere Ebenen erstrecken, so dass ein Daten-Mapping erforderlich ist, um die Felder an ein strukturiertes Schema anzupassen. Im Gegensatz zum einfachen Flattening geht es hier um die selektive Extraktion und Umstrukturierung von Daten unter Beibehaltung der kontextuellen Beziehungen. Felder müssen möglicherweise neu zugewiesen oder aggregiert werden, um in das Zielschema zu passen, wobei die Transformationslogik entscheidet, ob sie beibehalten, abgeleitet oder verworfen werden sollen. Kontextabhängige Imputation füllt fehlende Werte durch Vererbung übergeordneter Attribute, Querverweise auf Datensätze oder Verwendung externer Metadaten auf. Die Filterung stellt die Datenintegrität sicher, indem unvollständige Datensätze zurückgewiesen und Konflikte aufgelöst werden. Durch die programmatische Zuordnung von Rohattributen und das intelligente Füllen von Lücken gewährleistet dieser Prozess Datenkonsistenz, Genauigkeit und Nutzbarkeit für nachgelagerte Analysen.

Stellen Sie sich beispielsweise eine API-Antwort vor, in der Details zu einer Meisterschaft, ihren Spielzeiten und registrierten Teams verstreut sind, wobei einige Attribute entweder fehlen oder inkonsistent formatiert sind:

{
    "championship": {
        "id": 101,
        "name": "Championship",
        "region": null
    },
    "seasons": [
        {
            "id": 202,
            "year": 2024,
            "teams": [
                {
                    "id": 303,
                    "name": "Club",
                    "category": "Professional",
                    "players": [
                        {
                            "id": 404,
                            "role": "Striker"
                        },
                        {
                            "id": 405
                        }
                    ]
                }
            ]
        }
    ]
}

Abbildung 5: JSON und Datenhierarchie auf mehreren Ebenen

Um diese Daten richtig zu strukturieren, bildet die Transformationslogik die verschachtelten Attribute in ein vereinfachtes Schema ab und wendet dabei kontextbezogene Füllregeln an:

mapped_data = []
for season in api_response["seasons"]:
    for team in season["teams"]:
        for player in team["players"]:
            transformed_record = {
                "championship_id": api_response["championship"]["id"],
                "championship_name": api_response["championship"]["name"],
                "championship_region": api_response["championship"].get("region", "Undefined"),
                "season_id": season["id"],
                "season_year": season["year"],
                "team_id": team["id"],
                "team_name": team["name"],
                "team_category": team.get("category", "Unknown"),
                "player_id": player["id"],
                "player_role": player.get("role", "Unassigned")
            }
            mapped_data.append(transformed_record)

Abbildung 6: Datentransformation für Mapping

Bei diesem Prozess wird die Meisterschaftsregion abgebildet, aber mit „Undefiniert“ belegt, wenn sie fehlt. Die Mannschaftskategorie wird auf „Unbekannt“ zurückgesetzt, wenn sie nicht angegeben wird. Die Spielerrolle wird mit „Nicht zugewiesen“ belegt, wenn sie nicht vorhanden ist. Die tief verschachtelte Struktur wird in ein strukturiertes Format umgewandelt, bei dem jede Zeile einen eindeutigen Spieler mit allen relevanten Kontextdaten darstellt.

L = Laden: Speichern von Daten in Amazon S3

Sobald die Daten in einen sauberen DataFrame strukturiert sind, besteht der nächste Schritt darin, sie in Amazon S3 zu laden. S3 dient aufgrund seiner Skalierbarkeit, Sicherheit und Kosteneffizienz als ideale Speicherlösung für Daten und ist damit ein wesentlicher Bestandteil der Datenpipeline. Die transformierten Daten werden programmatisch mit Boto3, dem AWS SDK für Python, hochgeladen, das eine Schnittstelle für die Interaktion mit AWS-Diensten, einschließlich S3, bietet. [6] Die Funktion put_object in Boto3 wird normalerweise verwendet, um den DataFrame als Datei in einen S3-Bucket hochzuladen.

Um den Speicherprozess zu rationalisieren und zu automatisieren, kann eine benutzerdefinierte Dienstprogrammklasse wie BotoUtils erstellt werden. Diese Klasse erweitert die Möglichkeiten von Boto3 und enthält spezialisierte Funktionen für allgemeine Aufgaben wie die Verwaltung von S3-Speicher und die Sicherstellung, dass die Daten ordnungsgemäß formatiert, benannt und gespeichert sind. Eine der wichtigsten Funktionen innerhalb von BotoUtils ist die Funktion handle_storage, die speziell dafür entwickelt wurde, den Upload-Prozess zu automatisieren und gleichzeitig andere wichtige Faktoren wie Dateiformate, dynamische Schlüsselgenerierung und historische Verfolgung zu verwalten.

Diese Funktion ist für mehrere Schlüsselaufgaben zuständig:

1. Behandlung von Dateiformaten: Anhand der Eingabeparameter wird das richtige Dateiformat (z. B. CSV, Parquet) für die Speicherung der Daten in S3 ermittelt. Dies ist besonders bei großen Datensätzen wichtig, da sich das gewählte Format direkt auf die nachgelagerte Leistung auswirkt, einschließlich der Geschwindigkeit von Abfragen und der Verarbeitungseffizienz.

2. Erzeugen eines dynamischen S3-Schlüssels: Der S3-Schlüssel (Dateipfad innerhalb des S3-Buckets) wird dynamisch mit _type, base_key, extra_path und dem historize flag generiert. Diese Struktur organisiert die Daten nach Reifegrad - roh oder verarbeitet - und sorgt für Nachvollziehbarkeit und effiziente Abfragen. APIs beeinflussen die Schlüsselmuster, wobei Batch-Daten geplanten Pfaden folgen (Jahr/Monat/Tag) und Streaming-Daten ereignisbasierte Strukturen verwenden. Transformationen und Historisierung verfeinern den Schlüssel weiter für Versionskontrolle und Abruf. Das Format folgt:

{_type}/{base_key.get("source")}/{base_key.get("endpoint_key")}/{extra_path}

Abbildung 7: S3-Schlüsselerzeugung

3. Optionaler historischer Pfad: Wenn das historize flag auf True gesetzt ist (Standardeinstellung), fügt die Funktion einen Zeitstempel an den Schlüssel an, um sicherzustellen, dass die Daten zeitlich getrennt gespeichert werden. Dies ist besonders nützlich, um Änderungen im Laufe der Zeit zu verfolgen. Der Zeitstempel wird in Jahr, Monat und Tag aufgeteilt, um eine klare Ordnerstruktur innerhalb des S3-Buckets zu erstellen.

if historize:
    now = self.startup_timestamp
    year = now.strftime('%Y')
    month = now.strftime('%m')
    day = now.strftime('%d')
    key += f'/year={year}/month={month}/day={day}/'

Abbildung 8: Historize Zeitstempel

4. Hochladen der Daten in S3: Sobald der Schlüssel vorbereitet ist, kann die Funktion die Methode Boto3 put_object verwenden, um das Objekt in den entsprechenden S3-Bucket hochzuladen. Bei dem Objekt handelt es sich in der Regel um einen transformierten DataFrame, der unter Verwendung des dynamisch generierten Schlüssels an den angegebenen Speicherort in S3 hochgeladen wird.

s3_client.put_object(Bucket=s3_bucket_name, Key=s3_key, Body-obj)

Abbildung 9: S3 Put Object

Die Funktion handle_storage automatisiert den Prozess der Organisation und des Hochladens von Daten in S3 und gewährleistet eine konsistente und fehlerfreie Speicherung. Durch die dynamische Generierung von S3-Schlüsseln und die Einbeziehung einer datumsbasierten Partitionierung erleichtert sie eine effiziente Datenverwaltung und eine einfache historische Nachverfolgung. Dieser Ansatz gewährleistet Skalierbarkeit und Leistung, insbesondere bei großen Datenmengen, und ist damit ideal für Analysen und die langfristige Datenspeicherung. Letztendlich werden Arbeitsabläufe rationalisiert und eine saubere, strukturierte Speicherlösung aufrechterhalten.

Sobald die Daten erfolgreich in S3 geladen wurden, sind sie für weitere Analysen bereit. Da die Daten in einem zugänglichen, strukturierten Format gespeichert sind, können Tools wie Amazon Athena und Amazon QuickSight die Daten direkt aus S3 abfragen und analysieren. Athena ermöglicht serverlose SQL-Abfragen, während QuickSight leistungsstarke Visualisierungsfunktionen bietet, um Daten in umsetzbare Erkenntnisse zu verwandeln. Diese Integration stellt sicher, dass die Daten mit minimalem Aufwand erforscht und für tiefergehende Analysen genutzt werden können, um datengesteuerte Entscheidungen zu ermöglichen.

S3 Loading
Figure 10: Analysis possibility after S3 Loading

Fazit

In diesem Artikel wurde der gesamte ETL-Prozess für die Aufnahme, Umwandlung und das Laden von Statistiken aus einer externen API in Amazon S3 untersucht. Dieser Workflow nutzt die leistungsstarken Funktionen von AWS und bietet eine skalierbare und flexible Lösung für die effiziente Verarbeitung großer Datensätze. Durch die Automatisierung des ETL-Prozesses werden die Daten leichter zugänglich und ermöglichen schnellere Einblicke und fundiertere Entscheidungen.

Die Woodmark Consulting AG ist darauf spezialisiert, Unternehmen bei der Entwicklung, Implementierung und Optimierung von ETL-Pipelines zu unterstützen.
Sprechen Sie mit uns, wie wir Sie bei der Rationalisierung Ihres Datenbetriebs, der Maximierung des Werts Ihrer Daten und der Erschließung leistungsstarker Erkenntnisse mit AWS unterstützen können!

Quellen

[1]   https://aws.amazon.com/
[2]   https://aws.amazon.com/datapipeline/
[3]   https://docs.aws.amazon.com/prescriptive-guidance/latest/patterns/build-an-etl-service-pipeline-to-load-data-incrementally-from-amazon-s3-to-amazon-redshift-using-aws-glue.html
[4]   https://www.restapitutorial.com/introduction/httpmethods
[5]   https://pandas.pydata.org/
[6]   https://boto3.amazonaws.com/v1/documentation/api/latest/index.html

Teile diesen Artikel mit anderen

Über den Autor

Sakar ist nach Abschluss seines Informatikstudiums seit März 2024 bei der Woodmark Consulting AG tätig. Mit seiner Spezialisierung auf Data Engineering und Analytics nutzt er seine Expertise, um effiziente Datenpipelines zu entwickeln, Cloud-Datenplattformen zu betreuen und Datenworkflows zu optimieren. Durch seine Zertifizierungen in AWS und kontinuierliche Weiterentwicklung in Databricks vertieft er stetig sein Fachwissen.

Zur Übersicht Blogbeiträge