Skip to content

Instantly share code, notes, and snippets.

@siscia
Last active July 3, 2016 15:59
Show Gist options
  • Save siscia/66acd6df8cfa4d33c16914add0bdc823 to your computer and use it in GitHub Desktop.
Save siscia/66acd6df8cfa4d33c16914add0bdc823 to your computer and use it in GitHub Desktop.
\section*{Abstract}
L'esplosione di internet ha portato alla raccolta di un quantitativo di dati mai visto prima; la necessità di analizzare e trarre valore da questa grande quantità di dati ha reso fondamentale l'uso di applicazioni distribuite su più macchine.
L'uso di applicazioni distribuite rende però molto difficile stimare il tempo di completamento dei task sottomessi, metrica che invece è di estremo interesse sia per gli utilizzatori del cluster che per chi lo gestisce.
In questo lavoro ci siamo concentrati nella analisi del tempo di completamento di job MapReduce eseguiti su Hadoop usando il framework MARC.
I risultati ottenuti sono incoraggianti, ma non ancora utilizzabili per ottenere stime reali di tempi di completamento.
\section{Introduzione}
L'esplosione di internet ha portato alla raccolta di un quantitativo di dati mai visto prima; la necessità di analizzare e trarre valore da questa grande quantità di dati ha reso fondamentale l'uso di applicazioni distribuite su più macchine.
Gli algoritmi utilizzati per compiere queste analisi si basano quindi sulla divisione del lavoro da svolgere su diverse macchine comunicanti tra di loro. Questa divisione del lavoro rende possibile analizzare enormi quantitativi di dati in parallelo ma introduce nuove problematiche.
È necessario coordinare le diverse parti del sistema, muovere i dati dove questi possono essere analizzati, aspettare risultati intermedi spesso pronti in momenti diversi ed essere certi che i risultati ottenuti siano corretti.
Tutta questa complessità deve essere bilanciata per riuscire ad ottimizzare il ``throughput'' del cluster.
Per riuscire a scegliere i giusti tradeoff è necessaria molta esperienza ed il processo è difficilmente definibile scientifico. Spesso gli amministratori dei cluster si affidanno alle comunità di utilizzatori per capire come meglio ottimizzare il loro hardware.
Inoltre, a causa della grande complessità di questi sistemi, le variabili che possono influenzare le performance di un cluster sono più di quelle gestibili da una persona singola e con effetti spesso imprevisti.
Ad esempio la dimensione dei blocchi di memoria dove i dati sono salvati è una variabile che ha conseguenze molto estese sulle performance di un cluster.
Blocchi di memoria troppo grossi sono un bottleneck nella rete interna del cluster rendendo difficile spostare i dati dove questi possono essere analizzati e non permettono molta granularità nella divisione dei task tra le varie macchine.
D'altra parte blocchi troppo piccoli richiedono un enorme overhead di comunicazione tra le macchine fino ad arrivare alla situazione limite nella quale è più il tempo necessario a coordinare la computazione che non la computazione stessa.
In questo contesto il nostro lavoro si è concentrato sul riuscire a predirre i tempi di completamento di jobs Hadoop.
La piattaforma Hadoop è stata scelta perchè una delle più note e solide piattaforme per il calcolo distribuito sulla quale sono ormai basati molti altri sistemi per analisi distribuite.
Il concentrarsi nella predizione dei tempi di completamento è un primo fondamentale passo verso l' ottimizzazione dei cluster che è, al tempo stesso, di interesse sia accademico che industriale. Si pensi che al momento i proprietari di cluster Hadoop non hanno un modo esatto per sapere quando la loro computazione terminerà e quando le risorse attualmente utilizzate saranno libere per computazioni successive.
\section{Stato dell'Arte}
L' argomento non è trattato in modo intensivo in letteratura.
Fra la letteratura analizzata abbiamo osservato che solo il progetto Startfish \cite{starfish} , che si propone di ottimizzare il tempo di completamento di MapReduce jobs è correlato ai nostri obbiettivi.
Sfortunatamente le metriche che Starfish usa non sono più influenti nelle moderne versione di Hadoop il che lascia un importante vuoto nel mondo della computazione distribuita.
\section{}
\subsection{Hadoop}
Hadoop \cite{hadoop} è stato uno dei primi framework open source per applicazioni distribuite di successo; è pensato per essere utilizzato su grandi cluster di commodity hardware con l'assunzione che i fallimenti a livello sia hardware che software sono comuni e che dovrebbero essere gestiti automaticamente.
Hadoop è basato sul paradigma di computazione MapReduce il quale rende facile parallelizzare il carico su molto macchine ma non permette di esprimere tutti i modelli computazionali in modo efficiente.
La computazione dentro Hadoop è quindi divisa in 2 fasi, la fase di Map e quella di Reduce; la fase di Reduce è ulteriormente suddivisa in 3 fasi: Shuffle, Sort and Reduce.
La fase di map applica delle prime trasformazioni al dato che vogliamo analizzare per ottenere dei primi risultati parziali, la fase di reduce combina tutti i risultati parziali per ottenere il risultato finale.
La fase di shuffle è necessaria per spostare fisicamente i risultati parziali da dove sono stati prodotti a dove può essere applicato il reducer.
Il reducer, infine, aspettandosi gli argomenti ordinati rende necessaria la fase di sort.
\subsection{MARC}
MARC \cite{marc} è un framework distribuito per modellare il consumo di risorse.
MARC è basato sulla idea di configurazioni. Una configurazione è un insieme di parametri che definiscono lo stato del sistema. All'interno di una configurazione la risorsa in analisi viene consumata in modo costante rispetto al tempo.
Ad alto livello un sistema attraverserà varie configurazioni ed in ogni configurazione la risorsa in analisi verrà consumata in modo diverso.
Un esempio illuminante è rispetto alla batteria dei telefoni cellulari. Le configurazioni possono essere viste come l'insieme degli accessori utilizzati nel cellulare. Con l'uso del wifi la carica della batteria diminuisce in modo costante rispetto al tempo (ex. -10\%/hr), se oltre il wifi accendiamo anche il GPS allora la carica diminuirà più velocemente ma sempre in modo costante rispetto al tempo (ex. -35\%/hr).
La risorsa in analisi viene quindi modellata secondo una funzione autoregresiva lineare a tratti, dove l'attuale stato della risorsa dipende dallo stato della risorsa al passo temporale precedente più un certo contributo dato dalla configurazione in cui si trova il sistema.
L'obbiettivo di MARC è proprio quello di trovare il contributo con la quale ogni configurazione influisce sulla risorsa.
\subsection{Applicare MARC ad Hadoop}
Per applicare la metodologia MARC al tempo di completamento di Hadoop è necessario quindi riuscire a modellare il tempo di completamento con una funzione lineare a tratti che MARC riesce a gestire.
Osservazioni sui carichi di lavoro reali di Hadoop hanno mostrato che ogni fase di Hadoop (Map, Shuffle, Sort \& Reduce) si comporta in modo lineare come è facilemente osservabile dalla figura \ref{fig:lineare_a_tratti}.
\begin{figure}
\includegraphics[scale=0.5]{linearity}
\caption{Linearità nelle fasi di Haddop.}
\label{fig:lineare_a_tratti}
\end{figure}
Oltre alla fase di Hadoop abbiamo anche considerato altri fattori influenzati del tempo totale di computazione. È ragionevole aspettarci che un dataset più grande impieghi più tempo ad essere analizzato, similmente il numero di worker che lavorano sullo stesso job influenzerà il tempo di completamento, e ancora, il tipo di job eseguito su un dataset influenzerà il tempo totale di completamento.
Ad esempio una possibile fase nel nostro modello potrebbe essere identificata da:
\begin{enumerate}
\item dimension del dataset (possibili valori: 30Gb, 100Gb, 200Gb)
\item fase di Hadoop (Map, Sort, Shuffle, Reduce)
\item numero di reducers (1, 6, 12, 24)
\item workload (TeraSort, WordCount)
\end{enumerate}
\subsubsection{Parametri di Configurazione}
I parametri di configurazione realmente usati nel nostro modello sono 6:
\begin{description}
\item [Fase di Hadoop] In quale fase Hadoop si trova (Map, Shuffle, Sort, Reduce)
\item [Workload] Abbiamo analizzato due workload principali:
\begin{description}
\item [TeraSort] Ordinare un array di intero
\item [WordCount] Contare il numero di parole all'interno di un documento
\end{description}
\item [Dimensione del dataset] La dimensione del dataset in input alla computazione
\item [Blocksize] Dimensione dei blocchi di memoria fissa usati per memorizzare i dati da analizzare.
\item [Numero di reducers] Quanti worker erano destinati alla fase di reduce, il numero di worker destinati alla fase di map è definito dal rapporto $\frac{Dimensione}{Blocksize}$
\item [Percentuale di inizio della fase di reduce] A quale percentuale della fase di map deve iniziare la fase di reduce.
\end{description}
\section{Dati \& Analisi}
In questa sezione si descrive l'apparato sperimentale e come i dati sono stati raccolti ed analizzati.
In particolare partendo da delle tracce di esecuzione dei job su Hadoop dividiamo queste tracce nelle varie configurazioni di MARC usando i paramentri di configurazione appena descritti.
Su queste configurazioni stimiamo il tempo di completamento ed infine uniamo insieme i risultati per stimare il tempo di completamento totale.
Le stime vengono quindi confrontate con tempi di completamento reali.
\subsection{Apparato Sperimentale}
Il nostro apparato sperimentale è composto da 4 macchine con processore Intel Xeon con 4 core e 8 Thread virtuali e 24Gb di RAM.
Le macchine godono di una ottima connettività riuscendo a comunicare tra di loro con velocità nell'ordine dei Gbps.
\subsection{Raccolta dati}
Abbiamo raccolto dati sulle metriche di utilizzo delle singole macchine (CPU load, memoria usato, I/O) e la percentuale di completamento dei singoli job MapReduce.
L'approccio alla raccolta dei dati è stato simile in entrambi i casi, prima i singoli dati venivano scritti su file all'interno della macchina di test e in fasi successive della analisi i file venivano parsati, puliti, e infine salvati in un database SQLite per maggiore praticità.
Tutta la coordinazione tra le macchine avveniva tramite ssh, software necessario al corretto funzionamento di Hadoop.
La raccolta dei metriche di utilizzo è stata fatta usando un software opensource, dstat \cite{dstat}, che si occupa di leggere le metriche di utilizzo è scriverle su file. Abbiamo quindi creato un apposito demone linux che successivamente è stato installato su tutte le macchine.
La raccolta delle percentuali di completamento dei singoli job invece è stata realizzata configurando ad hoc il sistema di log di Hadoop. Abbiamo permesso ad Hadoop di scrivere log anche su STDOUT e infine abbiamo ri-direzionato STDOUT su file.
Per la gestione e coordinazione di tutto il sistema di raccolta dati è stato creato uno script bash apposito. Lo script si occupa di:
\begin{enumerate}
\item Generare il dataset su cui far girare la computazione di interesse secondo vari parametri passati in input.
\item Far partire il sistema di raccolta metriche.
\item Inizializzare il job di interesse.
\item Aspettare finchè il job non finisce.
\item Fermare il sistema di raccolta metriche.
\item Eliminare il dataset usato come input e quello generato come output
\item Aspettare un ragionevole lasso di tempo per essere certi che la computazione appena svolta non interferisca con la computazione successiva.
\end{enumerate}
A sua volta questo primo script bash veniva invocato da un ulteriore script che cicliva nello spazio dei paramentri di interesse.
\subsection{Analisi}
Una volta che i dati sono stati raccolti e salvati su file, in uno step successivo, i dati sono spostati dalle macchine remote su una macchina locale dove avviene la fase di pulitura.
Un parser scritto in python legge i vari file ed estrae le informazioni di interesse che vengono memorizzate in un database SQLite \cite{sqlite}.
\subsubsection{Visualizzazione}
Per capire quali sono le variabili di maggiore interesse per i nostri obbiettivi abbiamo iniziato visualizzando i dati.
Sono state create tre tipi di visualizzazioni dinamiche via web usando javascript e la libreria D3js \cite{d3}.
\begin{itemize}
\item La prima forma di visualizzazione, figura \ref{fig:time}, mostra le tracce di un solo job dati i paramentri. Abbiamo quindi il tempo sull'asse delle variabili indipendenti e la percentuale di completamento, sia della fase di map, che della fase di reduce e la somma delle due, sull'asse delle variabili dipendenti. Questa visualizzazione ci è stato molto utile per validare le assunzioni che avevamo sul modello Hadoop permettendoci di validare la linearità a tratti necessaria al modello MARC.
\item Il secondo grafico visualizza i vari job con dimensioni e colori relativamente alla velocità di completamento e clusterizza secondo i paramentri di esecuzione, è quindi possibile capire quali parametri sono più influenti sul tempo di completamento totale.
\item Il terzo grafico, in figura \ref{fig:aggregate}, mostra tempo medio e varianza per ogni configurazione usata. Si è reso necessario quando il numero di job da visualizzare è diventato eccessivo.
\end{itemize}
\begin{figure}
\includegraphics[scale=0.5]{visualization_time}
\caption{Visualizzazione percentuali}
\label{fig:time}
\end{figure}
\begin{figure}
\includegraphics[scale=0.5]{WordCount}
\caption{Visualizzazione aggregata, la dimensione dei cerchi è corralta alla velocità media di completamento, mentre il colore al numero di sample.}
\label{fig:aggregate}
\end{figure}
\subsection{Applicazione del modello}
Prima di applicare il modello MARC abbiamo visualizzato usando python e matplotlib \cite{matplotlib} i punti ottenuti in modo sperimentale dividendoli per fasi, aspettandoci di vedere tracce con pendenze molto simili come MARC si aspetta.
Con questa strategia abbiamo quindi evitato di lanciare le pesanti computazioni di MARC su dati e fasi che non avrebbero potuto generare buoni risultati.
MARC necessità di dati espressi in formato csv e di un file di configurazione xml per capire come interpretare i dati.
Il file di configurazione è statico ed è stato scritto a mano e leggermente modificato ad ogni iterazione.
I dati invece sono generati in modo automatico ogni volta leggendo il database SQLite, in questa fase vengono anche aggiunte informazioni, inferite dai dati, riguardanti la fase di computazione.
Il file generato viene quindi caricato via ftp sulle macchine MARC e la computazione lanciata in modo manuale.
\section{Risultati}
MARC, oltre al modello di computazione, ritorna anche l'errore abbinato al modello.
La nostra principale metrica è quindi stato l'errore che abbiamo cercato di minimizzare il più possibile.
Una volta che l'errore ha raggiunto livelli che abbiano ritenuto sufficientemente bassi abbiamo anche rappresentato la curva prevista del modello insieme ai dati reali usando python.
Abbiamo provato a migliorare il modello usando tecniche di Machine Learning (Regression Tree e Kernel Density Analysis) per riuscire a classificare in modo migliore le tracce dei nostri job ma con scarsi risultati.
I risultati ottenuti sono interessanti ma non ancora sufficientemente buoni da essere utilizzati per scopi pratici.
Abbiamo attribuito le scarse performance alla grande correlazione tra le variabili di input (specialmente la percentuale di completamento) e la variabile di output (tempo di completamento) che causa matrici mal condizionate e difficilmente invertibili all'interno di MARC stesso.
\section{Lavori futuri}
Grazie alla opportunità di presentare il nostro lavoro in aziende leader nel settore ci è stato esposto il problema di come stimare il tempo di completamento nel caso in cui siano presenti delle parte di computazione più lente delle altre. Il caso di ``ritardatari'' si mostra, a causa di motivi implementativi di Hadoop, come la percentuale di completamento che rimane ferma per molto tempo al 33\% o al 99\%.
In base alla nostra esperienza proponiamo due filoni di ricerca che pensiamo possono portare buoni risultati.
Il primo filone, dividendo le tracce di completamento in fasi, utilizza tecniche di Machine Learnign per stimare il tempo di completamento di ogni fase e poi riunire i risultati.
Questo metodo difficilmente riuscirebbe a stimare il reale tempo di completamento nel caso di ritardari.
Però sarebbe una applicazione di tecniche conosciute con solida teoria.
Il secondo filone sfrutta nuovamente MARC ma evitando di usare la percentuale di completamento come input e usando le metriche macchina per stimare il lavoro fatto in un intervallo di tempo definito.
Ex. La fase di Map è CPU bound, in un lasso di tempo dipendente dal tipo di computazione eseguita, dalla dimensione del dataset e dall'uso della CPU mi posso aspettare un determinato progresso.
Questo metodo potrebbe essere capace di stimare anche il caso di ritardatari, ma è un terreno abbastanza inesplorato.
\section{Ringraziamenti}
Ringrazio calorosamente tutti il NECST Lab per l'ambiente e l'atmosfera che riesce a creare. Ringrazio il professore Santambrogio per le opportunità che mi ha concesso. Ringrazio Marco Rabozzi per il fondamentale aiuto che mi ha fornito nel progetto. Ringrazio Matteo Ferroni per il supporto che mi ha dato nell'uso di MARC e ringrazio Andrea Corna e Andrea Damiani per la creazione di MARC e per i loro illuminanti commenti. Ringrazio Emanuela Furfaro per grande aiuto fornito nella analisi statistica dei dati.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment