Cvičenie 10 - Spracovanie prúdov dát pomocou Apache Spark Streaming

Cieľom desiateho cvičenia je naučiť sa pracovať s neohraničenými prúdmi dát.

Spark Streaming

Spark Streaming umožňuje v prostredí Spark spracovávať dátové prúdy. Spark prúdy diskretizuje do tzv. mikro-dávok, ktoré sú potom vnútorne reprezentované ako RDD množiny. Na tieto prúdové RDD množiny je potom možné aplikovať RDD operácie z predošlých cvičení. Okrem nich, Spark Streaming rozširuje množinu operácií o operácie vykonávané nad časovými oknami.

Prvý príklad je Streaming Wordcount - jednoduchá úloha pre spočítanie slov pomocou Spark Streaming API.

Uložte si tento kód do samostatného skriptu, napr. NetworkWordcount.py. Pre otestovanie skriptu si otvorte nové okno terminálu. V ňom si najprv nainštalujte Netcat (utilitu pre sieťové služby). Budeme ju používať pre generovanie textového prúdu dát odosielaných na špecifiký sieťový port. Netcat nainštalujete príkazom:

sudo yum install nc

Spustite Netcat pomocou príkazu:

nc -lk 9999 

Zobrazí sa príkazový riadok kde môžete vkladať text, ktorý bude následne spracovávaný Spark skriptom po jednotlivých riadkoch. Prepnite sa do pôvodného okna terminálu a spustite váš skript pomocou príkazu spark-submit:

spark-submit --master local[2] NetworkWordcount.py 

Podstatné je spustiť skript s parametrom master nastaveným na local aspoň v dvoch vláknach - ak by ste spustili iba na jednom, mohlo by sa stať, že by ste nevideli výsledok (nezostalo by voľné vlákno pre Executor). Do druhého okna terminálu so spustenou utilitou Netcat zadajte text a sledujte výpis v konzole Spark-u.

Práca so štruktúrovanými prúdmi

V nasledujúcej úlohe si ukážeme ako prúdové dáta transformovat do štruktúrovanej podoby a ako s nimi pracovať pomocou dátových rámcov a SQL operácií. Spark vo verzi 2.0.0 a vyššie podporuje tzv. Structured Streaming, čo je spôsob spracovania prúdov pomocou dátových rámcov a SQL operácií. Môžeme tak spracovávat prúdové štruktúrované dáta. Spark ich vnútorne spracováva ako tzv. "unbounded tables" - neohraničené tabuľky - do ktorej sú nepretržite pridávané pribúdajúce záznamy.

Programátor potom špecifikuje výpočet podobným spôsobom ako pri práci s tradičnými dátovými rámcami. Rozdiel je jedine v špecifikácii dopytov - programátor musí aj samotný dopyt reprezentovať ako kontinuálny dátový prúd, ktorého výpočet Spark spustí ako inkrementálny dopyt na neohraničenej vstupnej tabuľke. Dopyt potom vygeneruje výsledok. Pri každom aktualizovaní tabuľky novými dátami (napr. každú sekundu) sa potom dopyt na pribúdajúcich dátach v jednotlivých dávkach prepočítava. Nasledujúci príklad demonštruje predchádzajúcu úlohu transformovanú pomocou Spark Structured Streaming.

Uložte si tento kód do samostatného skriptu, napr. StructuredNetworkWordcount.py. Skript otestujete rovnakým spôsobom ako v predošlom príklade.

V nasledujúcom príklade budeme pracovať s dátami zo zdroja PubNub. Verejný prúd z tohto zdroja obsahuje senzorické dáta nazbierané v rámci IoT platformy. Pomocou PubNub konektoru sa pripojíme na verejný zdroj senzorických prúdových dát a každý senzorický záznam bude prijatý ako JSON objekt s nasledovnými atribútmi:

'timestamp', 'ambient_temperature', 'humidity', 'photosensor', 'sensor_uuid', 'radiation_level'

Najprv spustíme konektor, ktorý nám umožní dáta zo zdroja načítať. Otvorte si nové okno terminálu a vo vašom domovskom adresári stiahnite skript Subscriber.py

wget http://people.tuke.sk/martin.sarnovsky/tsvd/files/Subscriber.py

Tento skript vám začne periodicky sťahovať dáta z PubNub zdroja a začne ich zapisovať do adresára /stream. Pre každý záznam vytvorí skrip Subscriber.py jeden JSON súbor. Predtým ako ho buďete môcť spustiť, musíte nainštalovať potrebný Python balík pre Pubnub. Najprv stiahnite a nainštalujte systém pre manažment python balíkov pip:

sudo yum install epel-release
sudo yum install gcc
sudo yum install python-pip

a potom môžete nainštalovať balík pre Pubnub:

sudo pip install pubnub==3.9.0

Nasledujúci skript obsahuje niekoľko úloh, ktoré stream takýchto objektov spracujú do dátového rámca pomocou Structured Streaming API a prezentuje niekoľko dopytov.

Pre otestovanie tejto úlohy spustite v separátnom okne terminálu Subscriber príkazom:

sudo python Subscriber.py

Potom spustite v druhom okne terminálu skript s dopytmi pomocou príkazu spark-submit. Spúšťajte ale vždy iba jeden streamovaný dopyt, nie viacero naraz a odsledujte výsledky. Subscriber ukončíte klávesovou skratkou CTRL + C, ak budete potrebovať vyčistiť stiahnuté dáta, tak ich v pod-adresári stream zmažte (ako root).

Viac informácií o spracovávaní prúdov a operáciách nájdete na tejto stránke (v angličtine). Viac o Spark Structured Streaming-u nájdete tu (v angličtine).

Úlohy

Úloha 10.1

Naštudujte si dokumentáciu k operáciám s posuvnými oknami a modifikujte skript NetworkWordcount.py tak, aby spočítaval výskyty slov za posledných 20 sekúnd v intervaloch každých 5 sekúnd. Skript spustite a otestujte. Rovnako sa v prvom príklade pohrajte s veľkosťou mikro-dávok špecifikovaných v Streaming Contexte.

Úloha 10.2

Naštudujte si dokumentáciu s Structured Streaming a rovnakú modifikáciu skriptu realizujte aj pre Streaming Wordcount úlohu z druhého príkladu pomocou Structured Streaming-u.

Úloha 10.3

V treťom príklade si zvoľte numerický atribút, ktorý diskretizujte. Pomocou takto vytvoreného kategorického atribútu napíšte streamovaný dopyt, ktorý bude zoskupovať senozory podľa jeho hodnôt a počítať priemerné hodnoty ostatných numerických atribútov pre tieto zoskupenia.