braindump ... cause thread-dumps are not enough ;)

Testing MR jobs

Some links with useful information regarding MR job testing:

Writing MR job which reads (scans) a table from HBase

HBase allows efficient reads and writes of single rows (as opposed to hive which is better at processing them in bulk). Unfortunately - sometimes it is actually required to read and process a bunch of rows together (eg. to compute some aggregates). There are couple things which could be done:

  • create an external table in hive which would be backed up by HBase (queries will run - but it will be very slow)
  • use tools like Phoenix (no idea how it works in terms of performance)
  • write your custom MR job which does the processing - this post is exactly about it :)

I was writing my code from scratch: below a list of problems which I encountered and managed to workaround or find a decent fix.

  1. Adding required external dependencies (jars) for the job.
In my case this was the `hbase.jar`. According to official [documentation][2] additional files should get added to distributed cache in job configuration. The actual solution is quite nice: `TableMapReduceUtil.initTableMapperJob` and `TableMapReduceUtil.initTableReducerJob` add this file automatically. 
  1. Exception:
``` Exception in thread "main" org.apache.hadoop.mapred.InvalidJobConfException: Output directory not set.
at org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:133)
at org.apache.hadoop.mapreduce.JobSubmitter.checkSpecs(JobSubmitter.java:433)
at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:335)
at org.apache.hadoop.mapreduce.Job$11.run(Job.java:1286)
at org.apache.hadoop.mapreduce.Job$11.run(Job.java:1283)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1438)
at org.apache.hadoop.mapreduce.Job.submit(Job.java:1283) ```

The problem was that the output format was missing. Adding `job.setOutputFormatClass(NullOutputFormat.class)` fixed the issue. Using `NullOutputFormat` means that the job will not return anything _(at that point I didn't want to emit anything. Later on I actually specified a different output format - which stored the data properly in HDFS.)_
  1. job.submit() throws again:
``` Exception in thread "main" java.io.IOException: No table was provided.
at org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getSplits(TableInputFormatBase.java:152)
at org.apache.hadoop.mapreduce.JobSubmitter.writeNewSplits(JobSubmitter.java:468)
at org.apache.hadoop.mapreduce.JobSubmitter.writeSplits(JobSubmitter.java:485)
at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:369)
at org.apache.hadoop.mapreduce.Job$11.run(Job.java:1286)
at org.apache.hadoop.mapreduce.Job$11.run(Job.java:1283)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1438)
at org.apache.hadoop.mapreduce.Job.submit(Job.java:1283) ```

The problem is probably connected with the fact that mentioned test table doesn't exist :) After creating it in hbase the problem vanished.
At this point I got some sample code which started working: 

```java
public class ReadOnly {
  private static class MyReadingMapper                             extends TableMapper<Text, Text> {
  
    @Override
    protected void map(ImmutableBytesWritable key, Result value, 
                       Context context) 
                       throws IOException, InterruptedException {
    // just print out the row key
    System.out.println("Got a row with key: " + 
                       Arrays.toString(key.get()));
    }
  }
  
  public static void main(String[] args) 
                throws IOException, 
                       ClassNotFoundException, 
                       InterruptedException {

    Configuration configuration = HBaseConfiguration.create();

    Job job = Job.getInstance(configuration);
    job.setJarByClass(MyReadingMapper.class);

    Scan scan = new Scan();
    scan.setCaching(500);
    scan.setCacheBlocks(false);
    
    TableMapReduceUtil.initTableMapperJob(
            "test_table",
            scan,
            MyReadingMapper.class,
            Text.class,
            Text.class,
            job
            );
    job.setOutputFormatClass(NullOutputFormat.class);
    job.submit();
    job.waitForCompletion(true);
  }
} ```

Observations: 
* Job spaws 1 mapper per Region Server
* Although I didn't configure reduction step at all, some reducers were kicked off.
  1. I wanted to use MultipleOutputFormat (to output multiple files from the job). Turned out that this API is not supported anymore in Hadoop2. You need to use MultipleOutputs. But this solution has an advantage: I might not need to confiigure reducers at all! Some useful docs: here, here and here.

  2. This time hadoop attacked with: Output directory exists exception:

``` Exception in thread "main" org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory hdfs://myhadoop/user/test/prototype already exists
at org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:141)
at org.apache.hadoop.mapreduce.lib.output.FilterOutputFormat.checkOutputSpecs(FilterOutputFormat.java:61)
at org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat.checkOutputSpecs(LazyOutputFormat.java:83)
at org.apache.hadoop.mapreduce.JobSubmitter.checkSpecs(JobSubmitter.java:433)
at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:335)
at org.apache.hadoop.mapreduce.Job$11.run(Job.java:1286)
at org.apache.hadoop.mapreduce.Job$11.run(Job.java:1283)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1438)
at org.apache.hadoop.mapreduce.Job.submit(Job.java:1283) ```

The solution was to extend the `TextOutputFormat` class as instructed [here][7] and use the new version as mapper's output format. Afterwards the job was properly spitting out files to multiple directories according to hbase row content. Phew...

In the whole process of writing the job the HBase manual turned out to be very helpful.

mockito - "any" and "isA" matchers

The most obvious interpretation of any() matcher in mockito is that it accepts any object. Ok - and what about any(Clazz.class)? It accepts any object of class Clazz.

Actually those two sentences contain fallacies… First: any accepts also null parameters (it doesn’t require the argument to actually exist!). Secondly: any(Clazz.class) accepts any object of … any class! Actually, in most cases it would be probably better to use a isA matcher - which simply checks the class of passed object (the object needs to exist in the first place!).

Notes on "Clean Code - Concurrency" chapter

  • Writing clean parallel programs is hard.
  • Concurrency helps to split what is being executed from when it is executed.
  • Concurrency introduces overhead - both on performance and writing code.
  • Concurrent errors are hard to trace and often hard to reproduce.
  • Concurrency often requires changing primary rules of a project.
  • Number of critical sections should be as small as possible.
  • Size of critical sections should be as small as possible.
  • Suggestions:
    • Code related to concurrency should be split from business logic.
    • Access to data, which can be shared, should be limited with hermetization.
    • It is worth trying to split data in independent subsets, which can be separately processed by independent threads (possibly on separate processors).
    • Since 1.5, Java has a nice built-in library for dealing with common parallel processing problems - learn how to use it.
    • Write tests which can expose concurrency problems - run them as often as possible on a broad number of machines, operating systems, configurations etc.
    • When debugging, ensure that business logic (single-threaded code) works correctly.
    • Write parallel code which can be configured and test how it works in many different configurations.
    • Don’t ignore single system failures - they are probably just hard-to-reproduce problems.
    • Enhance code with elements which can introduce failures. (what about introducing elements on the infrastructure level which can deliberately cause problems and test whether the whole system is resilient? Simian Army to the rescue!)
  • Ideal way of eliminating problems with shared data is eliminating data sharing (eg. by copying whole structures and treating them as read-only).
  • Threads should be as independent as possible.
  • Dependencies between synchronized methods can introduce hard-to-trace problems - it is suggested to make a single synchronized method per class.
  • Testing doesn’t guarantee correctness - it just minimizes the risk of writing incorrect software.
  • Concurrent code should be tuneable.
  • Run more threads than you have processors - it increases the probability of detecting deadlocks.
  • Concurrent code should be executed often on all target platforms.

Notes on "Clean Code - Project Development" chapter

  • Four rules of simple project:
    1. It passes all tests.
      • A system which is tested throughly and passes the tests each time is a testable system.
      • Having a testable systems enables its evolution and possibility to refactor/extend it.
    2. Doesn’t contain duplications.
      • Improving system structure can involve any kind of good practises
      • Modularization
      • Concern separation
      • Introducing loose coupling, making classes highly cohesive.
    3. Expresses the intent of programmer.
      • Most time of working on a software project is spent on long term maintenance.
      • The easier to understand the code is, the less time other people need to spend on analyzing it.
      • Use standard nomenclature.
    4. Minimizes number of methods and classes.
      • In the process of creating small methods and small classes one may create a big number of small classes. This is why that rule also applies to the number of methods and classes (it should be small!).
      • Big number of classes and methods is a result of pointless dogmatism.
      • That rule is the least important out of all presented rules.
  • Rules 2-4 can be applied only when tests are in place.