Dwa dni temu Hadley Wickham na twitterze podlinkował wprowadzenie do pakietu multidplyr. Przyjrzyjmy się temu backendowi.
Czym jest multidplyr?
Zgodnie z opisem na githubie, jest to biblioteka pozwalająca na przetwarzanie danych z użyciem dplyrowych czasowników z użyciem wielu rdzeni. Idea podobna do sparka. Podobne backendy istnieją od lat (dla hardkorowców RMPI, dla mniejszych distributeR czy paralel i wiele innych z listy https://cran.r-project.org/web/views/HighPerformanceComputing.html). Problem z istniejącymi rozwiązaniami jest ich hakerskość. W 9 przypadkach na 10, przy próbie robienia bardziej złożonych rzeczy wszystko wybucha, a traceback ma przynajmniej 20 pozycji.
Rozwiązania Hadleya, podobnie jak Appla, mają zazwyczaj przyjemniejszy design, wybuchają rzadziej, a kosztem ograniczonej funkcjonalności otrzymujemy jakąś frajdę z korzystania.
Niestety, multidplyr jest jeszcze młodym rozwiązaniem i czasem potrafi odebrać nadzieję. Ale też jest wiele rzeczy, które robi przyzwoicie, a czasem nawet bardzo dobrze i często bardzo szybko. W oficjalnej winietce znajduje się przykład analiz z danymi flights z pakietu nycflights13. Jeżeli dane nie są bardzo duże to koszt rozruszania klastra i rozproszenia danych przeważa koszt obliczeń. Ale już jeżeli danych byłoby więcej (dziesiątki milionów wierszy) lub też obliczenia byłyby bardziej złożone czasowo (np. gam’y) to rozproszenie może się opłacić.
Przykład
Akurat pracuję teraz nad systemem analizy logów z pewnych dziwacznych urządzeń. Logów jest masa (kilkaset milionów wierszy) i są porozpraszane po wielu plikach. Wykorzystałem multidplyr aby równolegle wczytywać dane i wykonywać wstępny preprocessing na 15 rdzeniach w gołym R. Dzięki temu udało się zejść z czasem przetwarzania z jednego dnia do dwóch godzin. Przyznacie, jest to sporym przyśpieszeniem, nawet jeżeli wliczyć czas potrzebny na poznanie multidplyr.
Inicjuje cluster z 15 węzłami, akurat by zapełnić wolne rdzenie.
cluster <- create_cluster(15) ## Initialising 15 core cluster. set_default_cluster(cluster) |
Zaczynam od znalezienia wszystkich plików z rozszerzeniem log. To z nich będę wyłuskiwał dane.
lf <- data.frame(files=list.files(pattern = "log", recursive = TRUE), stringsAsFactors = FALSE) |
Przygotowuję funkcję, którą będę wykorzystywał do wczytywania i przetwarzania danych.
Wysyłam tę funkcję na wszystkie węzły klastra.
readAndExtractTimepoints <- function(x) { tmp <- readLines(as.character(x)[1]) ftmp <- grep(tmp, pattern="Entering scene", value=TRUE, fixed=TRUE) substr(ftmp,1,15) } cluster_assign_value(cluster, "readAndExtractTimepoints", readAndExtractTimepoints) |
Rozpraszam listę z nazwami plików. Następnie dla każdego piku wywołuję już równolegle funkcje readAndExtractTimepoints. Wynik znajduje się w rozproszonej ramce danych (klasa party_df).
lf_distr <- lf %>% partition() %>% group_by(files) %>% do(timepoints = readAndExtractTimepoints(.$files)) lf_distr ## Source: party_df [897 x 3] ## Groups: PARTITION_ID, files ## Shards: 15 [59--60 rows] ## ## PARTITION_ID files timepoints ## (dbl) (chr) (chr) ## 1 1 2013/01/cnk02a/cnk02a.log <chr[14480]> ## 2 1 2013/01/cnk02b/cnk02b.log <chr[13761]> ## 3 1 2013/01/cnk06/cnk06.log <chr[26978]> ## 4 1 2013/01/cnk07/cnk07.log <chr[20198]> ## 5 1 2013/01/cnk09/cnk09.log <chr[26835]> ## 6 1 2013/01/cnk10/cnk10.log <chr[50527]> ## 7 1 2013/01/cnk100/cnk100.log <chr[5191]> ## 8 1 2013/01/cnk11/cnk11.log <chr[27712]> ## 9 1 2013/01/cnk15/cnk15.log <chr[32364]> ## 10 1 2013/01/cnk16/cnk16.log <chr[15853]> |
Teraz tylko trzeba zebrać dane z rozproszonej ramki z powrotem do R.
timeP <- collect(lf_distr) str(timeP$timepoints) ## List of 897 ## $ : chr [1:144830] "Jan 1 08:15:57 " "Jan 1 18:04:37 " "Jan 1 18:05:44 " "Jan 2 08:15:57 " ... ## $ : chr [1:123649] "Jan 1 08:16:05 " "Jan 2 08:16:05 " "Jan 2 09:46:08 " "Jan 2 09:46:13 " ... ## $ : chr [1:137661] "Jan 1 08:15:57 " "Jan 2 08:15:57 " "Jan 2 09:34:47 " "Jan 2 09:35:45 " ... |
Wrażenia
Pewnie dałoby się takie przetwarzanie zrobić jeszcze szybciej czy to robiąc parsowanie w pythonie czy wrzucając dane na sparka, ale jeżeli projekt nie jest duży, łatwiej utrzymać rozwiązanie zależne tylko od jednej technologii.
Ogólne wrażenie jest takie, że jest to bardzo dobry, ale wciąż prototyp. Wrażliwy na nieprzewidywane akcje (jak np. łańcuch operacji do()). Jednak znając siłę oddziaływania Hadleya myślę, że w kolejnej wersji będzie to już świetne narzędzie.
Wreszcie równoległe przetwarzanie w R będzie dla ludzi.
Jak już jesteśmy przy wielordzeniowym przetwarzaniu, co sądzi Pan o RRO, który mocno przyspiesza obliczenia macierzowe?
https://mran.revolutionanalytics.com/documents/rro/multithread/
RRO to wielowątkowy BLAS/LAPACK.
Z pewnością przyśpieszy modelowanie bazujące na operacjach algebraicznych (czyli lm/glm i podobne).
Miłe jest też to, że nie trzeba żadnych zmian w kodzie, podmienia się tylko biblioteki.
Ale akurat w moim przypadku większość czasu przetwarzania to preprocessing logów, RRO tutaj nie pomoże (ale też nie zaszkodzi).
Sporo na szybkości (rząd?) można zyskać dodając argument fixed=TRUE do grep’a:
ftmp <- grep(tmp, pattern="Entering scene", value=TRUE, fixed=TRUE)
A co do samej wielowątkowości to pakiet zapowiada się obiecująca, ale akurat takie problemy można rozwiązywać używając starszego plyr-a:
library(doParallel)
registerDoParallel(15) # albo library(doMC);registerDoMC(15)
llply(lf$files, readAndExtractTimepoints, .parallel=TRUE)
TRUE
😉
Bardzo ciekawy wpis. Mam pytanie z trochę innego poziomu. Czy możesz napisać czego dotyczą dane, które analizujesz? Oczywiście jeśli to nie tajemnica. Brzmi jak coś bardzo ciekawego a już szczególnie fragment „Entering scene” 🙂
To bardzo ciekawe dane i pracuję nad tym by były otwarte. Niedługo napiszę więcej i o nich i o wynikach z wstępnych analiz.
Czyżby logi z Pogromców ? 🙂
Nazwy plików powinny coś zdradzić, ale na razie nie mogę wiele więcej powiedzieć.
Złaknionym dłubaniem w logach polecam jutrzejszy wpis 😉
A już widzę, nie spojrzałem na nazwy 🙂
Ciekawe… choc takie BiocParallel to już tez „równoległe przetwarzanie w R dla ludzi” 🙂
Nie używałem, ale z dokumentacji mam wrażenie, że bplapply ma ten sam poziom abstrakcji co mclapply.
Ale skoro jest dla ludzi to przyjrzę się bliżej 😉
Może się przydać do testowania:
NYC Taxi & Uber – http://toddwschneider.com/posts/analyzing-1-1-billion-nyc-taxi-and-uber-trips-with-a-vengeance/
Fannie Mae & Freddie Mac mortgage data – http://toddwschneider.com/posts/mortgages-are-about-math-open-source-loan-level-analysis-of-fannie-and-freddie/
Ekstra,
w piątek mam zajęcia z Data Mining, zastanowię się jak przerobić te dane na zadanie z klasyfikacji
(może: odgadnij miejsce docelowe bazując na godzinie i miejscu startowym).
Ech, okazuje się, że z uwagi na wielkość danych raczej to zadanie na projekt niż na zajęcia (opisywany czas przetwarzania ~3 dni).
Zapomniałem jeszcze o tych danych, też mogą być interesujące do zabawy. 🙂
https://www.capitalbikeshare.com/trip-history-data
Dzięki,
szkoda, że nie ma takich danych dla veturilo.