The latest release of the DataFrame API in Apache Spark 1.3 adds another tool in the toolbox to quickly process large amounts of data. As always, we’re excited to get our hands on something new to see how we can apply it to relevant and interesting investment management problems.
The data frame concept is a very powerful abstraction that takes full advantage of Apache Spark SQL (and logical optimizer), allowing for complex queries to be expressed more concisely in a logical sequence. Reminiscent of the dplyr package in R or Python pandas – transformations, filters and selections can also easily be applied to a distributed data set. For investment management, the API allows us to get answers to complex questions on very large data sets quickly. This case study explores how we applied this solution to one such problem.
Informing Delinquent Loan Behavior
Recently, we were using loan-level credit disclosure data to run some tests, which is publicly released by Freddie Mac as part of their effort to increase transparency. The data consists of approximately 40GB of anonymized credit performance data for individual mortgages that Freddie purchased or guaranteed between 1999 and 2013. This “medium-sized” data set is just large enough to be outside the comfort-zone of typical Excel and R users, without clever manipulation. We explored this data set using the Spark DataFrame API to determine how easy it would be to answer a simple question: How many 30-59 day delinquent loans in the past month were able to catch up on their payments in the next month (i.e. how many loans cured per month)?
After downloading and dropping the files onto HDFS, we start a Spark shell and selectively loaded the data of interest into a data frame.
1 scala> case class Loan(
2 loan_seq_no: String,
3 as_of_date: Int,
4 next_as_of_date: Int,
5 curr_delinq_status: String
6 )
7
8 import au.com.bytecode.opencsv.CSVParser
9 val series = sc.textFile("/credit_disclosure/uncompressed/historical_data1_time_Q*.txt").mapPartitions(lines => {
10 val parser = new CSVParser('|')
11 lines.map( line => {
12 val p = parser.parseLine(line)
13 val month = p(1).substring(4,6).toInt
14 val nextyear = p(1).substring(0,4).toInt + (month / 12).toInt
16 val nextmonth = (month % 12) + 1
17 Loan(p(0), p(1).toInt, (nextyear + "%02d".format(nextmonth)).toInt, p(3))
18 })
19 }).toDF()
For the sake of this exercise, we loaded the pipe-delimited data into an RDD and convert it into a data frame.
1 scala> series.count()
2 res1: Long = 759315910
Wrestling the Data for Answers
It looks like the data consists of over 750 million records. To answer the question of determining how many loans were cured, we’ll need to match prior months where the loans are delinquent with loans in the following month that have become current. Having loaded the relevant flag into the curr_delinq_status field of the data frame, we can get a feel for how many observations there are (0 = current and 1 = 30-59 days delinquent).
scala> series.groupBy("curr_delinq_status").count().orderBy(desc("count")).show()
curr_delinq_status count
0 729768147
1 12978755
2 3766027
3 1834698
4 1323766
5 1084867
6 915940
7 784339
8 676828
9 586471
R 545415
10 511651
11 446790
12 392225
13 344698
14 304884
15 270558
16 241441
17 216052
18 194237
Most of the observations we see are current (curr_delinq_status is 0) by a large margin, so we can surmise the answer to our question to be fairly small. But to get the actual number of loans that cured, we can filter delinquent loans and current loans then easily join them back together.
*Due to some issues with joining, we had to alias some of the columns so that we weren’t joining on the same name.
1 val delinq = series.
2 filter(series("curr_delinq_status") === "1").
3 select(series("loan_seq_no").as("delinq_id"),
4 series("as_of_date").as("delinq_month"),
5 series("next_as_of_date").as("next_month"))
6
7 val current = series.
8 filter(series("curr_delinq_status") === "0").
9 select(series("loan_seq_no").as("curr_id"),
10 series("as_of_date").as("month"))
11
12
13 val curedPerMonth = delinq.
14 join(current, delinq("delinq_id") === current("curr_id") && delinq("next_month") === current("month")).
15 groupBy("next_month").
16 count().
17 orderBy(desc("next_month"))
18
19 val loansPerMonth = series.
20 groupBy("as_of_date").
21 count().
22 orderBy(desc("as_of_date"))
Finally, we join the data all back together.
1 val percentCured = loansPerMonth.
2 join(curedPerMonth,
3 loansPerMonth("as_of_date") === curedPerMonth("next_month")).
4 select(
5 loansPerMonth("as_of_date"),
6 curedPerMonth("count").as("cured"),
7 loansPerMonth("count").as("total"),
8 (curedPerMonth("count") / loansPerMonth("count")).as("percentCured"))
Results
Saving this data to disk and creating a simple plot of the data reveals that the number of loans that cured increased steadily, but in concert with the number of total loans in the dataset. The overall percentage of loans cured per month is, as we assumed, very small.
Lessons learned
The journey to answer a simple question from this data took us longer than we expected, but we’re confident efficiency will improve with better documentation and examples. At the end of the day, the ability to transform and manipulate large datasets with a simple, clear and streamlined syntax proves very powerful. We’ll continue to look at ways we can apply these techniques to similar interesting questions & challenges we have in the investment management space.
The data provided is for informational purposes only. The information and opinions contained on this website are derived from proprietary and non-proprietary sources deemed by BlackRock to be reliable, are not necessarily all inclusive and are not guaranteed as to accuracy. BlackRock shall not have any liability for the accuracy of the information contained herein, for delays or omissions therein, or for any results based on the use of such information.
©2015 BlackRock, Inc. All rights reserved. BLACKROCK and ALADDIN registered and unregistered trademarks of BlackRock, Inc., or its subsidiaries in the United States and elsewhere. All other marks are the property of their respective owners.
TECH-0011
Fatal error: Allowed memory size of 134217728 bytes exhausted (tried to allocate 27893760 bytes) in /var/www/staging.rockthecode.io/wp-includes/comment-template.php on line 2162