One Billion Rows Challenge

26 Apr 2024

6 minute read

At the beginning of the year, The One Billion Row Challenge gained popularity. The challenge is to process a billion rows of data as fast as possible in Java. Recently, I decided to take on the challenge, but in Rust instead of Java.

Since I didn’t want to spend the rest of eternity optimizing my code, I set a more achievable goal: to get it under 10 seconds without looking at other solutions. Surprisingly, I reached that goal much faster than anticipated, with just three optimizations.

Naive Solution

Before starting with optimizations, I first wanted to see how quickly a naive solution performs and compare it to the naive solution in Java. On my machine1, the naive solution in Java takes 3 minutes and 20 seconds, while in Rust it takes 1 minute and 35 seconds.

My naive solution simply iterates through the lines of a buffer using an io::BufReader, like this:

let file = File::open("measurements.txt").unwrap();
let lines = io::BufReader::new(file).lines().flatten();

let mut stats: HashMap<String, Stat> = HashMap::new();

for line in lines {
    let (name, temp) = parse_line(line);
    match stats.get(&name) {
        None => stats.insert(name, Stat::new(temp))
        Some(stat) => stats.insert(name, stat.update(temp)
    }
}

print_stats(&stats);

The Stat struct is just holding the minimum and maximum temperatures, sum of the temperatures, and the number of entries for each location. This allows us to calculate the average temperature during printing.

Parallelization

The most obvious optimization we can do is to parallelize the parsing of the file. We can split the file into multiple parts, and each part gets processed by a different thread using the same method as described above. A straightforward approach to splitting the file is to make each thread process total_size / nr_threads bytes of the file. However, as the saying goes, the devil is in the details. Each thread must begin reading at the start of a line, and we must also be careful when we stop reading.

When a new thread is spawned, we move to the correct position in the file by finding the first newline character. However, if we are at the very beginning of the file, this step shouldn’t be done; otherwise, the first line will be missed. The following code shows this process:

fn read_part(start: u64, len: usize) -> HashMap<String, Stat> {
    let mut file = File::open(PATH).unwrap();
    file.seek(SeekFrom::Start(start)).unwrap();

    let mut read: usize = 0;

    if start != 0 {
        let mut buf: [u8; 256] = [0; 256];

        file.read(&mut buf).unwrap();
        let add = buf.iter().position(|c| *c == b'\n').unwrap() as u64;

        file.seek(SeekFrom::Start(start + add + 1)).unwrap();
        read += (add + 1) as usize;
    }

    // ...
}

In the variable read, we count the number of bytes read from the start position. When read > len, we stop reading the batch. Note the strict inequality. This handles the edge case where one thread ends with start + len positioned exactly at the end of a line, and another thread starts with start positioned at the \n character.

Parallelization reduces our runtime from 1 minute and 35 seconds to 34 seconds.

Read Line Allocation

Parallelization was so obvious that I didn’t need to perform any profiling to come up with it. However, this optimization did require some profiling, even though it’s pretty obvious once you know about it.

Currently, we are reading the file line by line using BufReader’s .lines() iterator. This iterator provides us with a new String every iteration. Upon examining the profiler, I observed that a significant amount of time was spent on allocating a String and moving data from the internal buffer to the newly allocated string. However, for most iterations, we don’t need ownership of the string. If we are not inserting a new value into the HashMap, we only need a &str.

This can be achieved by using the .read_line() method instead of the .lines() iterator, like this:

fn read_part(start: u64, len: usize) -> HashMap<String, Stat> {
    // ...

    let mut reader = io::BufReader::new(&file);
    let mut line = String::new();

    loop {
        line.clear();
        let cur_read = reader.read_line(&mut line).unwrap();
        if cur_read == 0 {
            // We reached the end of the file
            break;
        }
    }

    // ...
}

Additionally, when updating the HashMap value, we can use the .get_mut() method to obtain a mutable reference and update it directly, instead of first using .get() followed by .set(). This approach ensures that we only need to allocate a new String when inserting a new value into the HashMap.

This reduces the runtime to 24 seconds.

Custom Reader

Looking at the profiler, the results were better, but a majority of the time was still spent inside the BufReader’s .read_line() method. After digging a bit deeper, it turns out that the slowest part of .read_line() is std::str::from_utf8(), which converts a &[u8] into &str. I decided to write my own buffered reader and to use &[u8] and Vec<u8> until printing the output. This way, the slow .from_utf8() was only called once per city.

I implemented my own buffered reading by just reading a large chunk (1MB) of the file into a buffer. The buffer is then processed line by line until we reach a point where the entire line is no longer contained within the buffer, or we reach the end of the buffer. At that point, the unprocessed part of the buffer is coppied to the beginning of the buffer and the next part of the file is read into the rest of the buffer.

For implementation, I went the C-way and implemented it as part of the main processing loop. In hindsight, I should’ve probably defined a new type, since that would make the code more readable. With custom buffered reading, the code now looks like this:

fn read_part(start: u64, len: usize) -> HashMap<Vec<u8>, Stat> {
    // Open file and seek to the beginning of the line
    // ...

    let mut buf = [0 as u8; 1024 * 1024];
    let mut start = 0;
    let mut end = 0;

    loop {
        let cur_read = file.read(&mut buf[end..]).unwrap();
        if cur_read == 0 {
            // We reached the end of the file
            return stats;
        }
        end += cur_read;

        while let Some(line) = parse_line(&buf[start..end]) {
            if read > len {
                return stats;
            }
            read += line.data_read;
            start += line.data_read;

            // Store line data to HashMap
            // ...
        }
    }

    buf.rotate_left(start);
    end -= start;
    start = 0;
}

The parse_line function was also changed to return an Option<ParsedLine> instead of a (&str, f32) tuple. The ParsedLine struct contains the name, temperature, and the number of bytes of data that were read. It returns an Option so that we can determine if the entire line was successfully read or if the buffer does not contain the entire line. The parse_line function now looks like this:

fn parse_line<'a>(data: &'a [u8]) -> Option<ParsedLine<'a>> {
    let name_end = data.iter().position(|c| *c == b';')?;

    let idx = (data[name_end..]).iter().position(|c| *c == b'\n');
    let line_end = name_end + idx?;

    let res = ParsedLine {
        data_read: line_end + 1,
        name: &data[..name_end],
        temp: parse_temp(&data[(name_end + 1)..line_end]),
    };
    Some(res)
}

With this optimization, the runtime has dropped to 6 seconds, and I have achieved my initial sub-10 second goal. I could have pursued further optimizations, but I decided to move on to my next side project :)


  1. M1 MacBook Pro ↩︎