Pakiet multidplyr, wrażenia

Screen Shot 2015-11-13 at 10.54.15

Dwa dni temu Hadley Wickham na twitterze podlinkował wprowadzenie do pakietu multidplyr. Przyjrzyjmy się temu backendowi.

Czym jest multidplyr?

Zgodnie z opisem na githubie, jest to biblioteka pozwalająca na przetwarzanie danych z użyciem dplyrowych czasowników z użyciem wielu rdzeni. Idea podobna do sparka. Podobne backendy istnieją od lat (dla hardkorowców RMPI, dla mniejszych distributeR czy paralel i wiele innych z listy https://cran.r-project.org/web/views/HighPerformanceComputing.html). Problem z istniejącymi rozwiązaniami jest ich hakerskość. W 9 przypadkach na 10, przy próbie robienia bardziej złożonych rzeczy wszystko wybucha, a traceback ma przynajmniej 20 pozycji.
Rozwiązania Hadleya, podobnie jak Appla, mają zazwyczaj przyjemniejszy design, wybuchają rzadziej, a kosztem ograniczonej funkcjonalności otrzymujemy jakąś frajdę z korzystania.

Niestety, multidplyr jest jeszcze młodym rozwiązaniem i czasem potrafi odebrać nadzieję. Ale też jest wiele rzeczy, które robi przyzwoicie, a czasem nawet bardzo dobrze i często bardzo szybko. W oficjalnej winietce znajduje się przykład analiz z danymi flights z pakietu nycflights13. Jeżeli dane nie są bardzo duże to koszt rozruszania klastra i rozproszenia danych przeważa koszt obliczeń. Ale już jeżeli danych byłoby więcej (dziesiątki milionów wierszy) lub też obliczenia byłyby bardziej złożone czasowo (np. gam’y) to rozproszenie może się opłacić.

Przykład

Akurat pracuję teraz nad systemem analizy logów z pewnych dziwacznych urządzeń. Logów jest masa (kilkaset milionów wierszy) i są porozpraszane po wielu plikach. Wykorzystałem multidplyr aby równolegle wczytywać dane i wykonywać wstępny preprocessing na 15 rdzeniach w gołym R. Dzięki temu udało się zejść z czasem przetwarzania z jednego dnia do dwóch godzin. Przyznacie, jest to sporym przyśpieszeniem, nawet jeżeli wliczyć czas potrzebny na poznanie multidplyr.

Inicjuje cluster z 15 węzłami, akurat by zapełnić wolne rdzenie.

cluster <- create_cluster(15)
## Initialising 15 core cluster.
set_default_cluster(cluster)

Zaczynam od znalezienia wszystkich plików z rozszerzeniem log. To z nich będę wyłuskiwał dane.

lf <- data.frame(files=list.files(pattern = "log", 
                       recursive = TRUE),
                  stringsAsFactors = FALSE)

Przygotowuję funkcję, którą będę wykorzystywał do wczytywania i przetwarzania danych.
Wysyłam tę funkcję na wszystkie węzły klastra.

readAndExtractTimepoints <- function(x) {
  tmp <- readLines(as.character(x)[1])
  ftmp <- grep(tmp, pattern="Entering scene", value=TRUE, fixed=TRUE)
  substr(ftmp,1,15)
}
 
cluster_assign_value(cluster, "readAndExtractTimepoints", readAndExtractTimepoints)

Rozpraszam listę z nazwami plików. Następnie dla każdego piku wywołuję już równolegle funkcje readAndExtractTimepoints. Wynik znajduje się w rozproszonej ramce danych (klasa party_df).

lf_distr <- lf %>% 
  partition() %>% 
  group_by(files) %>%
  do(timepoints =  readAndExtractTimepoints(.$files))
lf_distr
## Source: party_df [897 x 3]
## Groups: PARTITION_ID, files
## Shards: 15 [59--60 rows]
## 
##    PARTITION_ID                     files   timepoints
##           (dbl)                     (chr)        (chr)
## 1             1 2013/01/cnk02a/cnk02a.log <chr[14480]>
## 2             1 2013/01/cnk02b/cnk02b.log <chr[13761]>
## 3             1   2013/01/cnk06/cnk06.log <chr[26978]>
## 4             1   2013/01/cnk07/cnk07.log <chr[20198]>
## 5             1   2013/01/cnk09/cnk09.log <chr[26835]>
## 6             1   2013/01/cnk10/cnk10.log <chr[50527]>
## 7             1 2013/01/cnk100/cnk100.log  <chr[5191]>
## 8             1   2013/01/cnk11/cnk11.log <chr[27712]>
## 9             1   2013/01/cnk15/cnk15.log <chr[32364]>
## 10            1   2013/01/cnk16/cnk16.log <chr[15853]>

Teraz tylko trzeba zebrać dane z rozproszonej ramki z powrotem do R.

timeP <- collect(lf_distr)
str(timeP$timepoints)
## List of 897
##  $ : chr [1:144830] "Jan  1 08:15:57 " "Jan  1 18:04:37 " "Jan  1 18:05:44 " "Jan  2 08:15:57 " ...
##  $ : chr [1:123649] "Jan  1 08:16:05 " "Jan  2 08:16:05 " "Jan  2 09:46:08 " "Jan  2 09:46:13 " ...
##  $ : chr [1:137661] "Jan  1 08:15:57 " "Jan  2 08:15:57 " "Jan  2 09:34:47 " "Jan  2 09:35:45 " ...

Wrażenia

Pewnie dałoby się takie przetwarzanie zrobić jeszcze szybciej czy to robiąc parsowanie w pythonie czy wrzucając dane na sparka, ale jeżeli projekt nie jest duży, łatwiej utrzymać rozwiązanie zależne tylko od jednej technologii.

Ogólne wrażenie jest takie, że jest to bardzo dobry, ale wciąż prototyp. Wrażliwy na nieprzewidywane akcje (jak np. łańcuch operacji do()). Jednak znając siłę oddziaływania Hadleya myślę, że w kolejnej wersji będzie to już świetne narzędzie.

Wreszcie równoległe przetwarzanie w R będzie dla ludzi.

16 thoughts on “Pakiet multidplyr, wrażenia”

    1. RRO to wielowątkowy BLAS/LAPACK.
      Z pewnością przyśpieszy modelowanie bazujące na operacjach algebraicznych (czyli lm/glm i podobne).
      Miłe jest też to, że nie trzeba żadnych zmian w kodzie, podmienia się tylko biblioteki.
      Ale akurat w moim przypadku większość czasu przetwarzania to preprocessing logów, RRO tutaj nie pomoże (ale też nie zaszkodzi).

  1. Sporo na szybkości (rząd?) można zyskać dodając argument fixed=TRUE do grep’a:

    ftmp <- grep(tmp, pattern="Entering scene", value=TRUE, fixed=TRUE)

    A co do samej wielowątkowości to pakiet zapowiada się obiecująca, ale akurat takie problemy można rozwiązywać używając starszego plyr-a:


    library(doParallel)
    registerDoParallel(15) # albo library(doMC);registerDoMC(15)
    llply(lf$files, readAndExtractTimepoints, .parallel=TRUE)

  2. Bardzo ciekawy wpis. Mam pytanie z trochę innego poziomu. Czy możesz napisać czego dotyczą dane, które analizujesz? Oczywiście jeśli to nie tajemnica. Brzmi jak coś bardzo ciekawego a już szczególnie fragment „Entering scene” 🙂

Dodaj komentarz

Twój adres e-mail nie zostanie opublikowany. Wymagane pola są oznaczone *