קיטרתי – גם אני לא עשיתי זאת בעצמי (כמעט).
בפוסט זה ארצה לגעת בכמה מערכות מבוזרות – המאפשרות לפתור בעיות מבוזרות מבלי להיכנס לפרטים של “בחירת מנהיג” או “יצירת קונצנזוס”. בעיות רבות של עיבוד מבוזר – חוזרות על עצמן, וחבל להתחיל לפתור אותן מהתחלה.
ישנם שני סוגים של מערכות מבוזרות נפוצות שאני יכול לחשוב עליהן:
- מוצרים: אפליקציית לקוח של טורנט, Tor (רשת פרוקסי לאנונימיות), או משחק Massively Multi-player Online.
- Frameworks: המאפשרים פתרון בעיות מבוזרות ב”רמת הפשטה גבוהה”.
חישוב מבוזר
Scatter – Gather הוא השם המקובל לדפוס חישוב מבוזר, המבוסס על 2 רעיונות:
- חישוב הוא מהיר יותר על מערכת מקומית (גישה לזיכרון מהירה מגישה ברשת, גישה לדיסק מקומי לרוב תהיה מהירה מגישה לדיסק מרוחק) – ולכן נבצע את החישוב היכן שנמצאים הנתונים.
- למחשב יחיד אין דיי שטח אכסון לכל הנתונים של “חישוב גדול”, וגם אם היה לו – לא היה לו את כח החישוב המספיק לעבד את כל הנתונים הללו בזמן סביר.
נ.ב. “זמן סביר” הוא יחסי. לא נדיר להיתקל בשאילתות שרצות על מערכות Hadoop במשך שבועות – עד לסיום החישוב.
Map Reduce – הוא היישום הידוע של פרדיגמת Scatter-Gather, שנחשב היום אמנם כבוגר – ומובן היטב.
הרעיון הוא לחלק את החישוב המבוזר לשני שלבים:
- Map – עיבוד נתונים נקודתי על המכונה הלוקאלית (חיפוש / פילטור / עיבוד נתונים וכו’)
- Reduce – ביצוע סיכום של החישוב וחיבור התשובות של המחשבים הבודדים לתשובה אחת גדולה.
.
Google Map-reduce ו Hadoop מבוססים על מערכת קבצים מבוזרת (GFS ו HDFS בהתאמה), ותשתית לביצוע חישוב מבוזר על גבי ה nodes שמחזיקים את הנתונים הנ”ל (ב Hadoop נקראת YARN, בגוגל כנראה פשוט “Mapreduce”).

ניתן לכתוב פונקציות “Map” ו “Reduce” בקוד, אך סביר יותר להניח שכאשר אתם מגדירים חישובי Map-reduce רבים – תעדיפו להשתמש ברמת הפשטה גבוהה יותר כגון אלו שמספקים Pig (מן “שפת תכנות” מיוחדת ל Map-Reduce) או Hive (המבוסס על SQL).
כדרך אגב: Pig קיבל את שמו מכיוון שחזירים אוכלים גם צמחים וגם בשר – ו Pig יודע לטפל גם ב structured data וגם ב unstructured data.
דוגמה נוספת לדפוס של Scatter – Gather הוא הדפוס בו עושים שימוש ב Spark. אני לא בטוח שזהו שם רשמי – אך אני מכיר את המינוח “Transform – Action” (כאשר “map” הוא פעולת transform אפשרית, ו “reduce” הוא action אפשרי).
Spark הוא הכוכב העולה של עולם ה Big Data. בניגוד ל Hadoop הוא מאפשר (כחלק מהמוצר) מספר רב יותר של פונקציות מאשר “map” ו “reduce”.
במקום פעולת ה map ניתן להשתמש בסמנטיקות כגון map, filter, sample, union, join, partitionby ועוד…
במקום פעולת ה reduce ניתן להשתמש בסמנטיקות כגון reduce, collect, count, first, foreach, saveAsXxx ועוד.
מעבר לגמישות הזו – לא חייבים לתאר את הבעיה כ-2 שלבי חישוב, אלא ניתן להגדיר גרף חישוב גמיש כמעט לחלוטין.
עניין זה חשוב – כי יש בעיות חישוביות שפשוט לא מתמפות יפה ל 2 השלבים הברורים האלו. למשל: כאשר יש צורך להשתמש באותם נתוני-מקור כמה פעמים במהלך תהליך החישוב.
ל Spark יש עוד שתי תכונות חשובות:
- הוא מחזיק (סוג של Cache) את הנתונים בזיכרון – אליו הגישה מהירה בסדרי גודל מאשר בדיסק.
- הוא מרכז את רוב שרשרת החישוב באותו node ומצמצם את הצורך להעביר נתונים בין nodes ב cluster (גם ל Hadoop יש כלים שעוזרים לצמצם העברות של נתונים בין nodes, אך Map-reduce פחות מוצלח בכך באופן טבעי)
- ה “agent” של Spark רץ במקביל למערכת ה storage המבוזר, תהיה זו Hadoop או Cassandra (או סתם קבצים בפורמט נתמך) – ושואב ממנו את הנתונים הגולמיים לחישוב.
- ה Agent (נקרא node – אני נמנע משם זה בכדי למנוע בלבול) מקבל Job (לצורך העניין: קטע קוד) לבצע חישוב מסוים על הנתונים. יש כבר ב Agent קוד של פונקציות שימושיות (filter, sort, join, וכו’ – לפעולות “map” או count, reduce, collect – לפעולות “reduce”).
- ה Agent מחלק את המשימה ל stages ובונה גרף חישוב (על בסיס הידע כיצד מאורגנים הנתונים ב cluster) – ושולח tasks חישוביים ל Agents ב cluster.
Event Processing
Spark ו Hadoop הם מנועים לחישוב אצוות (batch): יש טריגר מסוים שמתחיל את החישוב – והחישוב מתבצע לאורך זמן מסוים (עדיף שיהיה קצר ככל האפשר, אך הוא לא מוגבל בזמן) – עד שמתקבלת התוצאה.
זן אחר של חישוב הוא חישוב ב Streaming: ביצוע חישובים קטנים רבים, כחלק משטף של אירועים שנכנס למערכת.
בקטגוריה זו ניתן לכלול את Spark Streaming (המבוסס על Spark), אך אתמקד דווקא ב Storm – שנבנה לצורך זה במקור.
Storm
- מערכת Horizontal scalable ו fault-tolerant – כמובן!
- מערכת המספקת תשובות ב low-latency (שלא לומר real-time – כי זה פשוט לא יהיה נכון), תוך כדי שימוש בכוח חישוב מבוזר.
- At-lest-once Processing מה שאומר שהנחת היסוד במערכת המבוזרת היא שחלקים שלה כושלים מדי פעם, ולכן לעתים נעשה אותו חישוב יותר מפעם אחת (לרוב: פעמיים) – בכדי להמשיך לקבל תוצאות גם במקרה של כשל מקומי.
Storm נולדה מתוך פרויקט בטוויטר לחישוב trending analysis ב low-latency.
המבנה של Storm Computation Cluster הוא סוג של Pipes & Filters, כאשר יש:
- Stream – רצף של tuples של נתונים.
- Spout – נקודת אינטגרציה של המערכת, המזרימה נתונים ל streams.
- Bolt – נקודת חישוב (“Filter”) בגרף החישוב (“הטופולוגיה”).
על מערכת Storm מריצים טופולוגיות חישוב רבות במקביל – כל אחת מבצעת חישוב אחר. - Supervisor – הוא node של Storm, שכולל כמה Workers שיכולים להריץ את קוד ה bolts או ה spouts.
- Stream Grouping – האסטרטגיה כיצד לחלק עבודה בין bolts שונים.
אם העבודה היא חישובית בלבד – כל אחד מה supervisors יוכל להריץ את כל הטופולוגיה (ואז לא צריך להעביר נתונים בין מחשבים במערכת – שזה עדיף). נוכל לבחור, למשל, ב Local Stream Grouping – שמורה ל Storm לשלוח את הנתונים ל Worker על אותה המכונה.

Mahout
ארצה להזכיר בקצרה את Apache Mahout (שהוא חלק מ Hadoop), פירוש השם Mahout הוא “נהג הפיל” – ואכן Mahout רוכבת על הפיל (Hadoop).
זהו כלי שמספק סט של אלגוריתמים שימושיים הממומשים בצורה מבוזרת, חלקם על גבי Map-Reduce וחלקם על גבי Spark – ומאפשר לעבוד ברמת הפשטה גבוהה אפילו יותר.
סוגי הבעיות ש Mahout מכסה (כל אחת – בעזרת כמה אלגוריתמים שונים) הן:
- סינון שיתופי – “אנשים שאהבו את x אהבו גם את y”
- Classification – אפיון “מרחב” הנתונים מסוג מסוים – גם אלו שאין לנו. סוג של חיזוי.
- רגרסיות – הערכת הקשרים בין משתנים שונים.
- Clustering – חלוקה של פריטי מידע לקבוצות – ע”פ פרמטרים מסוימים.
- ועוד
.
Lambda Architecture
הכלים שהצגנו למעלה – מאפשרים לנו בחירה בין:
- תשובה מדויקת – אטית (למשל: Map-reduce)
- תשובה “לא מדויקת” – מהירה (למשל: Storm או Spark)
בהנחה שאנו מקריבים דיוק עבור המהירות. למשל: משתמשים בחישוב בנתונים בני חמש דקות – ולא העדכניים ביותר, או חישוב על סמך דגימה חלקית – ולא על סמך כלל הנתונים.
בארגון בעל צרכי מידע – למה לבחור בין האופציות? האם אי אפשר לקבל את שניהם?
סוג של שילוב מקובל בין 2 הגישות נקרא “ארכיטקטורת למבדה”, ספק אם על שם האות היוונית למבדה (Λ) שאם מטים אותה על הצד מזכירה מאוד את צורת הארכיטקטורה – ואולי בהקשר ל Lambda Calculus המבוססת על פישוט המערכת לפונקציות (בהקשר של תכנות פונקציונלי). הנחות היסוד של הארכיטקטורה הן:
- מייצרים במערכת התפעולית שטף של אירועים – כולם immutable (בתרשים למעלה: “new data stream”).
- את האירועים אוספים ומפצלים ל-2 ערוצים:
- ערוץ של שמירה לטווח ארוך – נוסח HDFS או AWS S3, ואז ביצוע שאילתות ארוכות באצווה (Batch).
- ערוץ של עיבוד מיידי והסקת תובנות – נוסח Storm, שבו ההודעות ייזרקו מיד לאחר שיעובדו (הן שמורות לנו מהערוץ הראשון) – הערוץ המהיר.
- את התובנות קצרות הטווח של הערוץ המהיר, ואת התובנות ארוכות הטווח – של ערוץ האצווה מרכזים ל storage שלישי (Service Layer) – לרוב HBase, Impala או RedShift, ממנו נבצע שליפות חזרה למערכת התפעולית.
כמו ארכיטקטורה, “ארכיטקטורת למבדה” היא רק תבנית מקובלת – ונכון יותר יהיה להתאים אותה לצרכים הייחודיים שלכם, ולא להיצמד בכוח לאיזו הגדרה מקובלת..
Apache Kafka
לסיום אני רוצה להזכיר עוד מערכת מבוזרת שהופכת פופולרית – Distributed Message Queue בשם קפקא.
בדומה למערכות הקודמות, קפקא היא מערכת Horizontally scalable ו Fault-tolerant.
בנוסף יש לה מימד של durability – הודעות נשמרות לדיסק ומשוכפלות בין מחשבים שונים.
בקפקא נלקחו בה כמה החלטות תכנוניות בכדי לתמוך בשטף אדיר של אירועים. קפקא מטפלת יפה בקצבים גבוהים מאוד של נתונים, במחיר של latency מסוים בכתיבת ובשליפת הודעות. שמעתי על חבר’ה שחווים latency אף של ל 2-3 שניות.
כלל-אצבע ששמעתי אומר כך: “אם יש לך עד 100,000 הודעות בשנייה – השתמש ב RabbitMQ, אם יש יותר – כדאי לשקול מעבר לקפקא”.
כמה הנחות בעולם של קפקא הן:
- הודעות ב Queue הן immutable – בלתי ניתנות לשינוי. ניתן להוסיף הודעות ל queue – ולקרוא אותן, אך לא לעדכן או למחוק אותן. ההודעות יימחקו כחלק מתהליך של ה Queue (למשל: לאחר 24 שעות) – והצרכן צריך לרשום לעצמו ולעקוב אלו הודעות כבר נקראו – כדי לא לקרוא אותן שוב. הן פשוט יושבות שם – כמו על דיסק.
קפקא מנהלת את ההודעות כקובץ רציף בדיסק – מה שמשפר מאוד את יעילות ה I/O שלה. כל הפעולות נעשות ב Bulks. - ה Queue נקרא “Topic” ולא מובטח בו סדר מדויק של ההודעות (פרטים בהמשך).
- Broker הוא שרת (או node) במערכת. אם לברוקר אין מספיק דיסק בכדי לאכסן את תוכן ה Topic, או שאין לו מספיק I/O בכדי לכתוב לדיסק את ההודעות בקצב מהיר מספיק – יש לחלק (באחריות ה Admin האנושי) את ה topic על גבי כמה partitions. ההודעות שמתקבלות ישמרו על partitions שונים, וקריאת ההודעות – גם היא תעשה מה partitions השונים.
סיכום
בפוסט אמנם הצגתי הרבה מוצרי “Big Data” – אך הכוונה הייתה להתמקד באספקטים המבוזרים שלהם.
מערכות מבוזרות מוסיפות סיבוך למערכת – ולכן כדאי להימנע מהן במידת האפשר. כן… ברור לי שלא-מעט מהנדסים מחפשי-אתגר ימצאו את הדרך להשתמש במערכות מבוזרות – גם שאין צורך עסקי לכך.
יש הגיון מסוים, שבמקום לכתוב באסמבלי מערכת מבוזרת בעצמנו – נשתמש במערכות קיימות. המערכות שהצגתי בפוסט זה הן דוגמה טובה לכך. מכאן-לשם, חלק מהמערכות לא יעשו בדיוק את מה שאתם זקוקים לו, וזה רגע טוב לצלול לשכבת הפשטה נמוכה יותר (כמו Zookeeper – או קוד שלכם) – ולפתור נקודתית את הבעיה.
שיהיה בהצלחה!
—-
לינקים רלוונטיים:
השוואה בין Spark ל YARN