Overview of Big Data and NoSQL Technologies as of January 2013

Overview of Big Data and NoSQL Technologies as of January 2013

What are the current sources of data that needs to be processed and utilized by companies?

We can start from the most common:

  • Documents
  • Existing relational databases (CRM, ERP, Accounting, Billing)
  • E-mails and attachments
  • Imaging data (graphs, technical plans)
  • Sensor or device data
  • Internet search indexing
  • Log files
  • Social media
  • Telephone conversations
  • Videos
  • Pictures
  • Clickstreams (clicks from users on web pages)

scale of the data

When is good to look for NoSQL / Big Data solution? What are the strong points of Big Data / NoSQL solutions?

  • If your relational databases do not scale to your traffic needs for acceptable cost of hardware and/or licenses.

  • If normalized schema of your relational database became too complex. If too many tables hold just tiny proportion of overall data. You can no longer print ERD on single A3 page.

  • If your business applications generate lots of supporting and temporary data that does not really belong to main data store. Such data includes customer's search results, visited pages, historical share prices, contents of abandon shopping carts and so on.

  • Your database schema is already denormalized in order to improve response times of your applications.

  • When joins in relational databases slow the system down to a crawl.

  • Relational data doesn’t map well to typical programming structures that often consist of complex data types or hierarchical data. Data such as XML is especially difficult because of its hierarchical nature. Complex objects that contain objects and lists inside of them do not always map directly to a single row in a single table.

  • If documents from different sources require flexible schema or no schema at all. If it is required to keep input data in it’s original formats.

  • If ETL (Extract Transform Load) is required on source data. NoSQL engines or Map/Reduce can perform ETL steps and produce output suitable to load into a RDBMS.

  • If missing data can be ignored when the volume of data is large enough. The law of Big Data is “More data beats clever algorithms”

  • When flexibility is required for analytics. It allows experimentation into what questions we should be asking before defining a fixed data model.

  • In NoSQL databases each data element or each document is versioned. This enables queries for values at specific time in history.

  • When we need to utilize outputs from many existing systems. An example is: In order to prepare relevant offer to a customer we need information from billing system, from historical orders of the customer, from orders of similar customers as well as from stock system and CRM system. Traditional integration of all the systems is expensive and not very flexible.

  • When we need to analyze unstructured data such as documents, log files or semi-structured data such as CSV files, forms and exports from other systems.

What are the strong points of relational databases?

  • It is a SQL language. It is well known, standardized and based on strong mathematical theories. There are exceptions: Hive, Cassandra and other NoSQL databases support subset of SQL standard.

  • When we can create database schema that is not to modified during production.

  • When one server or small cluster of servers can handle production load and we do not need additional scalability.

  • Relational Databases do have mature security features. Role-based security, encrypted communications and support for row and field access control, as well as access control through user-level permissions on stored procedures. We do have rules over what data can be changed and what data is audited. Data can be easily removed, masked and secured.

  • Full support of ACID transactions (atomicity, consistency, isolation, durability) properties that guarantee that database transactions are processed reliably. NoSQL databases usually have only eventual consistency and some support atomicity on single row level or a single batch job.

  • Relational database do have support for backup and rollback for data in case of data loss or corruption.

  • Relational database do have development, tuning and monitoring tools with good GUI.

Batch vs Real-time processing


Where is batch processing of Big Data used?

Batch processing is used when real-time processing is not required, not possible or too expensive. We can see several areas:

  • Conversion of unstructured data such as text files and log files into more structured records.

  • Transformation during ETL. We import data with many formats and we need to transform it into common formats that can be analyzed.

  • Ad-hoc analysis of data. We do not need any specialized data mining schema for our data such as star schema, data cubes and so on. We can explore our existing data and look for patterns and correlations. We can gain more understanding of our existing data before we commit ourselves to create production data-analysis application.

  • Data analytics application and reporting

Batch processing infrastructure


Hadoop

Batch processing systems utilize Map/Reduce and HDFS implementation in Apache Hadoop. It is possible to create batch processing application in Java, using only Hadoop but we should mention other important tools and how they fit into Hadoop infrastructure.

Apache Avro

Avro

In order to process data we need to have information about data-types and data-schemas. This information is used for serialization and deserialization for network communication as well as reading and writing to files.

Apache Avro is a RPC (Remote Procedure Call) and serialization system that supports reach data structures. It uses JSON for defining data types and protocols and serializes data in a compact binary format. Avro supports Schema evolution this means that data written by older version of schema can still be read in new versions. Avro requires schemas when data is written or read. Most interesting is that you can use different schemas for serialization and deserialization, and Avro will handle the missing/extra/modified fields.

Apache Pig

We need quick and simple way to create Map/Reduce transformations, analysis and applications. We need a script language that can be used in scripts as well as interactively on command line.

Apache Pig

Apache Pig is a high-level procedural language for querying large semi-structured data sets using Hadoop and the Map/Reduce Platform. Pig simplifies use of Hadoop by allowing SQL-like queries to run on distributed dataset.

An example of filtering log file for only Warning messages that will run in parallel on large cluster. Given script is automatically transformed into Map/Reduce program and distributed across Hadoop cluster.

messages = LOAD '/var/log/messages';
warns = FILTER messages BY $0 MATCHES '.*WARN+.*';
DUMP warns

Relational operators that can be used

FILTER Select a set of tuples from a relation based on a condition.
FOREACH Iterate the tuples of a relation, generating a data transformation.
GROUP Group the data in one or more relations.
JOIN Join two or more relations (inner or outer join).
LOAD Load data from the file system.
ORDER Sort a relation based on one or more fields.
SPLIT Partition a relation into two or more relations.
STORE Store data in the file system.

Apache Hive

What if we want to use SQL to create Map/Reduce jobs?

Apache Hive

Apache Hive is a data warehousing infrastructure based on the Hadoop. It provides a simple query language called HiveQL, which is based on SQL. Hive has three main functions: data summarization, query and analysis. Hive automatically translates SQL-like queries into Map/Reduce jobs that run Hadoop cluster.

  • Hive also enables data serialization/deserialization and increases flexibility in schema design by including a system catalog called Hive-Metastore.

  • Hive is not designed for OLTP (Online Transaction Processing) workloads and does not offer real-time queries or row-level updates. It is best used for batch jobs over large sets of append-only data.

HiveQL language supports ability to:

  • filter rows from a table using a where clause.
  • select certain columns from the table using a select clause.
  • do equi-joins between two tables.
  • evaluate aggregations on multiple "group by" columns for the data stored in a table.
  • store the results of a query into another table.
  • to download contents of a table from local (NFS) directory.
  • to store the results of a query in a HDFS directory.
  • to manage tables and partitions (create, drop and alter).
  • to plug in custom scripts in the language of choice for custom map/reduce jobs.

Apache Oozie

We try to keep Map/Reduce jobs, Pig Scripts and Hive queries simple and single purposed. This enable us to process large amount of data quickly without running out of memory.

How can we create complex ETL or data analysis in Hadoop?

The answer is to chain scripts so output of one script is an input for another. Linear chain of jobs is easy to implement in Java Hadoop client application but more complex workflows that represents real-world scenarios need a better workflow engine such as Apache Oozie.

Apache Oozie

Apache Oozie is a server based Workflow Engine specialized in running workflow jobs with actions that run Hadoop Map/Reduce, Pig jobs and other.

  • Oozie workflow is a collection of actions arranged in DAG (Directed Acyclic Graph). This means that second action can not run until the first one is completed.

  • Oozie workflows definitions are written in hPDL (a XML Process Definition Language similar to JBOSS JBPM or jPDL).

  • Workflow actions start jobs in Hadoop cluster. Upon action completion, the Hadoop callback Oozie to notify the action completion, at this point Oozie proceeds to the next action in the workflow.

  • Oozie workflows contain control flow nodes (start , end, fail, decision, fork and join) and action nodes (Actual Jobs).

  • Workflows can be parameterized (using variables like ${inputDir} within the workflow definition)

Example of Oozie workflow definition is:

Apache Oozie Example


<workflow-app name='wordcount-wf' xmlns="uri:oozie:workflow:0.1">
    <start to='wordcount'/>
    <action name='wordcount'>
        <map-reduce>
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <configuration>
                <property>
                    <name>mapred.mapper.class</name>
                    <value>org.myorg.WordCount.Map</value>
                </property>
                <property>
                    <name>mapred.reducer.class</name>
                    <value>org.myorg.WordCount.Reduce</value>
                </property>
                <property>
                    <name>mapred.input.dir</name>
                    <value>${inputDir}</value>
                </property>
                <property>
                    <name>mapred.output.dir</name>
                    <value>${outputDir}</value>
                </property>
            </configuration>
        </map-reduce>
        <ok to='end'/>
        <error to='end'/>
    </action>
    <kill name='kill'>
        <message>Something went wrong: ${wf:errorCode('wordcount')}</message>
    </kill>
    <end name='end'/>
</workflow-app>

Apache Sqoop


Apache Sqoop

Apache Sqoop is a tool designed for transferring bulk data between Apache Hadoop and structured datastores such as relational databases or data warehouses.

  • It can be used to populate tables in Hive and Hbase.

  • Sqoop integrates with Oozie, allowing you to schedule and automate import and export tasks.

  • Sqoop uses a connector based architecture which supports plugins that provide connectivity to external systems.

  • Sqoop includes connectors for databases such as MySQL, PostgreSQL, Oracle, SQL Server and DB2 and generic JDBC connector.

  • Dataset being transferred is sliced up into different partitions and a map-only job is launched with individual mappers responsible for transferring a slice of this dataset.

  • Sqoop uses the database metadata to infer data types.

Apache Sqoop

Sqoop example to import data from MySQL database ORDERS table to Hive table running on Hadoop.

sqoop import --connect jdbc:mysql://localhost/acmedb \
--table ORDERS --username test --password **** --hive-import

Sqoop takes care of populating the Hive metastore with the appropriate metadata for the table and also invokes the necessary commands to load the table or partition.

Export from Hadoop to database looks similar

Apache Sqoop Export

Apache Flume

Apache Flume is a distributed system to reliably collect, aggregate and move large amounts of log data from many different sources to a centralized data store.

Apache Flume schema

  • Flume Source consumes events delivered to it by an external source like a web server.
  • When a Flume Source receives an event, it stores it into one or more Channels.
  • The Channel is a passive store that keeps the event until it is consumed by a Flume Sink.
  • The Sink removes the event from the Channel and puts it into an external repository like HDFS.

Apache Flume features:

Apache Flume

  • Flume allows a user to build multi-hop flows where events travel through multiple agents before reaching the final destination.
  • It also allows fan-in and fan-out flows, contextual routing and backup routes (fail-over) for failed hops.
  • Flume uses a transactional approach to guarantee reliable delivery of events.
  • Events are staged in the channel, which manages recovery from failure.
  • Flume supports log stream types such as Avro, Syslog, Netcat.

Distcp

  • DistCp (distributed copy) is a tool used for large inter/intra-cluster copying.
  • It uses Map/Reduce for its distribution, error handling and recovery, and reporting.
  • It expands a list of files and directories into input to map tasks, each of which will copy a partition of the files specified in the source list.

Real-time processing – NoSQL Databases


Document stores

Apache CouchDB, MongoDB

Graph Stores

Neo4j

Key-Value Stores

Apache Cassandra, Riak

Tabular Stores

Apache Hbase