Spark + R = SparkR

spark-project-header1
Spark podbija coraz więcej serc. Nic dziwnego, skoro z wielu stron płyną komentarze o znaczącym (o rząd wielkości) przyśpieszeniu czasu potrzebnego na analizę dużych zbiorów danych.
Mamy rozbudowany mechanizm składowania (cache) obiektów w pamięci, dzięki czemu wykonując iteracyjnie operacje na tych samych danych nie ma potrzeby katowania dysku.

Jak dla mnie pewną wadą tej platformy było to, że aplikacje w Sparku należało pisać w Java, Scala lub Pythonie. To bardzo fajne języki, ale używam wielu specjalistycznych narzędzi statystycznych dostępnych w programie R i nie chciałbym ich przepisywać na python a tym bardziej na Java.

Szczęśliwie jednak, powstał łącznik dla R, powalający na integracje R i Sparka. Takie połączenie zapączkowało ponad rok temu, zainicjował je Shivaram Venkataraman z Berkeley. Z czasem kilka innych osób dołączyło do rozwoju pakietu SparkR
http://amplab-extras.github.io/SparkR-pkg/.

Dziś podzielę się pierwszymi wrażeniami z używania tego pakietu.

Przykład użycia

Problem, na którym testowałem Sparka, wydawał się idealnie pasujący do jego sposobu przetwarzania. Mam DUUUUŻo krótkich szeregów czasowych (kilkaset obserwacji każdy) i chcę dla każdego niezależnie zbudować model regresyjny, do reszt zastosować model typu ARIMA a następnie wykonać predykcję.

Testy rozpocząłem na lokalnej testowej instalacji w skład której wszedł tylko 1 master i 1 worker (a więc zabawka) po 8GB RAM. Sam testowy spark to wersja pre-built dla hadoopa 2.3 https://spark.apache.org/downloads.html.

Wcześniej próbowałem pracować z paczką MapR (ze strony http://doc.mapr.com/display/MapR/MapR+Sandbox+for+Hadoop można pobrać obraz preinstalowanego MapRowego Hadoopa z całym zoo) ale z jakiegoś powodu ta wersja nie działała tak jakbym chciał. Pozostałem więc przy gołym Sparku na lokalnym systemie plików. Koniec końców chodziło mi głównie o przetestowanie pakietu dla programu R.

Początki były trudne. SparkR potrafił bez większych powodów rzucić wyjątkiem, co więcej stack-trace z Java niewiele mówił. Gdzieś po drodze coś się nie zgadzało.
Dokumentacja dla pakietu R jest taka sobie, więc trochę trwało zanim udało się odgadnąć co może być kluczem, co może być wartością, i co się dzieje gdy listy mają więcej niż dwie wartości.
Pewnych problemów dostarczył fakt, że funkcje w pakiecie SparkR mają tą brzydką cechę, że kolidują np. z funkcjami z pakietu dplyr. I tak na przykład jeżeli załadujemy pakiet dplyr po pakiecie SparkR to funkcja collect S3 z dplyr nadpisze funkcję collect S4 z SparkR. I dostaniemy czarno magiczny błąd.

Ale po etapie najeżonym błędami, przyszedł czas gdy rzeczy zaczęły działać. Podstawową strukturą na której w Spark się pracuje są pary klucz-wartość. Pracując w R wygodne jest to, że ta para to lista, a elementy listy mogą być dowolnymi strukturami. Możemy więc jako wartość przesyłać ramkę danych, model regresji lub inne obiekty R.

Przyjemne jest też to, że aby uruchomić kod R na Sparku nie trzeba dużego narzutu kodu. Większość inicjacji, serializacji i deserializacji obiektów jest poukrywana i użytkownik nie musi się tym przejmować.

Poniżej prezentuję przykład użycia, który miałem do wykonania udało się wykonać w kilkunastu liniach kodu, po usunięciu zbędnych szczegółów.
Jak już przebrnie się przez początkowe trudności okazuje się że korzystanie ze Sparka jest bardzo wygodne.

Sparkler

Funkcją sparkR.init() inicjuje się połączenie ze sparkiem, textFile() tworzy połączenie ze źródłem danych, czy to na hdfs czy w lokalnej strukturze katalogów.
Funkcja flatMap() bierze kolekcje wierszy i dzieli je na słowa. Jako wynik zwraca wektor słów (a dokładniej listę z jednym wektorem, taki sparkowy format).
Na tym obiekcie można wykonać funkcję lapply, która wykona się na sparku na każdym wektorze osobno. Funkcja ta z wektora słów wyciągnąć powinna to co ważne i zapisać jako listę z dwoma elementami – kluczem i wartością. W moich danych każdy wiersz to jeden pomiar zapotrzebowania na cechę X obiektu Y w chwili T. Jako klucz wybieram kolumnę z id X a jako wartość pamiętam listę Y i T.
Funkcja groupByKey() przetasowuje dane, tak by pary z tym samym kluczem były razem (wspólne id X).
Następnie map() przetwarza grupy, jako argument dostaje się listę której pierwszym elementem jest klucz a drugim jest lista wartości (tutaj listę wektorów). Taką listę łatwo już zamienić na ramkę danych, zrobić dla niej predykcje.
Aby wyniki predykcji ściągnąć do R można wykorzystać funkcję collect().

Cały przykład zadziałał. Wersja standalone sparka połączona przez SparkR niestety jest tragicznie tragicznie tragicznie tragicznie wolna.
Teraz zabieram się do testów z większym prawdziwym klastrem Sparkowym, mam nadzieję, że wymiana backendu to przyśpieszy przetwarzanie znacząco.

Wpisy z kategorii Duże i złożone powstają przy współpracy z firmą CodiLime.

Dodaj komentarz

Twój adres e-mail nie zostanie opublikowany. Wymagane pola są oznaczone *