Wikidata on Azure - Part 3: Preprocessing data for simple analytic queries

At the end of the previous part, we had the data dump in Azure Data Lake Store (ADLS), and we were already set up to start running queries with Azure Data Lake Analytics (ADLA). We just have to write these queries. OK, then this should be a short one.

Let's dive in!

Prerequisites

For this part, you will need

  • an Azure subscription - preferably the one that you created in the first part of the guide. We will use the Azure Data Lake Analytics and its Data Lake Store. You will need the data dump copied to the store. See previous part.
  • [optional] if you want IDE features (autocomplete, etc.) while you are crafting your queries you should try Visual Studio or Visual Studio Code. There are guides for setting up your ADLA development environment both for VS or VSCode.

U-SQL introduction

U-SQL is the query language of ADLA. A U-SQL script consists of a series of steps. A typical script starts with an extract step and ends with an output step and have some processing steps in between. The processing steps work on rowsets: a non-persisted set of data records. A step gets a rowset as input and produces another rowset as output. When you work with files a basic flow looks like this:

  1. EXTRACT phase: extract step extracts rowset from the input file
  2. PROCESSING phase: a pipeline of processing steps calculate the result. A processing step transforms a rowset into another rowset.
  3. OUTPUT phase: output step writes a rowset to the output file

You can get an even more basic flow by omitting processing phase as illustrated here. As OUTPUT can use a different format than EXTRACT, hence this script can be used as a simple converter.

This very brief introduction should be enough to start designing our first U-SQL query. To develop ADLA, you should set up your dev environment by following the tutorials mentioned in the prerequisites section. These tutorials also have some simple queries to try out.

Extraction phase

Regardless of what particular query task we would have, the first step is to parse the Wikidata data dump in a rowset. The EXTRACT expression needs the following information:

  • path to the input file
  • the extractor type
  • column specifications (label and type) in the extracted rowset

The path should be easy as we have the input file in ADLS. A starting / denotes the root folder of the ADLS (source), then we can construct a relative path starting from there.

The extractor type should refer to a C#/.NET type implementing a specific interface (IExtractor). There are built-in extractors, or you can use a user-defined one.

Our decompressed dump is a single (but huge) JSON array as the Wikidata documentation states:

JSON dumps containing all Wikidata entities in a single JSON array. [...] Each entity object (data item or property) is placed on a separate line in the JSON file, so the file can be read line by line, and each line can be decoded separately as an individual JSON object.

Currently, there is no built-in extractor for JSON formatted files, but there are a few sample implementations in the Azure GitHub repo. Let's take a closer look at the alternatives.

JsonExtractor

From the readme:

The JSON Extractor treats the entire input file as a single JSON document.

Moreover, it processes the entire input file as a single step, i.e. it does not utilize the parallel processing model of built-in extractors. Hence, it is more suitable for a large number of small files, rather than for a single huge JSON, like that we have at hand. Can we do better?

MultiLevelJsonExtractor

This extractor extends the JsonExtractor above. For each column in the rowset, a JSONPath expression can be specified in the constructor. Then the extraction process finds the value for the corresponding column by navigating via the expression.

So, it is a little bit smarter, but has the same shortcomings for our use case, as its base class. Can we do better?

Built-in text extractor

The JsonExtractor readme advises a neat little trick for files containing multiple JSON documents, or more precisely, for files with a JSON document per line. Use a built-in extractor to split the file line by line, i.e. extract a rowset with one string column, then parse the string column in a following processing step. All the built-in extractors support the one line in the input = one row in the rowset way, so that sounds rather promising!

However, a few limitations are hiding in the fine print. A string column has a maximum size set to 128 kB. Unfortunately, some lines in the dump exceed that limit. Actually, the longest rows are bigger than 2 MB. Now what ... raging politely, maybe? We have a few other options:

  • use array (e.g. byte[]) as the column datatype. That pushes the limit up to 4 MB. Note, that "[U-SQL rows can contain up to 4 MB of data per row.]"(https://msdn.microsoft.com/en-us/azure/data-lake-analytics/u-sql/rowset-the-processing-data-structure-u-sql). Then, in a processing step, truncate or split the array to get the relevant part and convert that part (hopefully smaller than 128 kB) to string. Built-in extractors and the JsonExtractor can only extract a serialized byte array to a byte array column. For example, the built-in extractors can interpret hex string as byte array - see the byte[] row in this table. In our case, we would need a JSON fragment to be converted to byte array - but this is not supported in any of the extractors mentioned until now.
  • write a custom extractor that parses only the relevant part of the input file. The 4MB limit also applies. What is the relevant part, anyway? It is up to the query task. As a worst case, we have to customize our extractor or processing step - or write a new one - for each and every query.

So do we have to write a custom extractor, or the other option is to ... hmm ... we have to write a custom extractor??

Enhanced JsonExtractor on branch mrys-json

There is an enhanced version of JsonExtractor currently living in the mrys-json branch of the official U-SQL repo. Authored by Microsoft fellow Michael Rys, this extractor assigns the byte array representation of the corresponding JSON fragment to byte array columns. Exactly what we need for the long line problem!

Which extractor should we use, then? Rys' JsonExtractor can handle long lines, but it can not process line-by-line as a built-in extractor would.

Okay, so ultimately, do we have to write the ultimate extractor?

MultilineJsonExtractor

Fortunately, we already forked the mrys-json-branch and created the MultilineJsonExtractor for you. It derives from an enhanced JsonExtractor that in turn derives from the previous mrys-json-branch JsonExtractor. This double-enhanced JsonExtractor has the following capabilities:

  • tweaks mrys-json-branch JsonExtractor by disabling the formatting of JSON outputs. This can reduce the size of the extracted JSON by more than 50%
  • configurable byte array column assigning mode. It can assign to byte array column by converting the corresponding JSON fragment in the following ways:
    • Built-in extractor mode deserializes the JSON fragment to a byte array. The format of the fragment needs to be supported by the JSON.NET's ToObject method. This mode is the same as the original JsonExtractor has.
    • Mrys-json-branch extractor mode assigns the byte array representation of the fragment string. It uses UTF-8 encoding to convert to string to byte array. This mode is the same as the JsonExtractor on the mrys-json branch has.
    • Compressed mode the same as above, but the byte array gets GZip-compressed before assigning. This comes in handy if you have longer lines than 4 MB.
  • it can silently skip malformed JSON objects
  • users can specify the maximum number of JSON objects to parse
  • upgraded dependency JSON.NET to v11, that brings enhanced JSONPath support among others
  • added some unit tests

The MultilineJsonExtractor (MJE) builds upon these features and adds the capability to process the input line-by-line in parallel.

Let's apply MJE to the Wikidata dump.

Extracting from Wikidata data dump with MultilineJsonExtractor

Below is a schematic sample of the Wikidata dump file, showing its top-level structure. For the sake of readability, there are much more line breaks in there than in a real dump file. The place of real line breaks is indicated with {LINEBREAK}.

[{"id": "Q60",
  "type": "item",
  "labels": {},
  "descriptions": {},
  "aliases": {},
  "claims": {},
  "sitelinks": {}
},{LINEBREAK}
{"id": "Q61",
  "type": "item",
  "labels": {},
  "descriptions": {},
  "aliases": {},
  "claims": {},
  "sitelinks": {}
},{LINEBREAK}
...
{"id": "Q60",
  "type": "item",
  "labels": {},
  "descriptions": {},
  "aliases": {},
  "claims": {},
  "sitelinks": {}
}]{LINEBREAK}

One row ({LINEBREAK} to {LINEBREAK}) represents the full data of a single Wikidata item (or property). Items have a unique identifier stored in the id property. For example, id Q42 identifies the author Douglas Adams. You can check his data by going to the Wikidata pages of item Q42:
https://www.wikidata.org/wiki/Q42 (HTML formatted), https://www.wikidata.org/wiki/Special:EntityData/Q42.json (JSON)

If we split the file by the (real) linebreaks, or more precisely by the character sequence "a comma (,) followed by a line break (\n)", we can parse almost all rows, as they will contain just a single JSON document for a single item (the splitting characters are not part of the parser input). This is fortunate, since the JSON parser of a parallel processing unit sees only a single row as an input, and this input must be a valid JSON. However, the first and the last rows can be problematic with their extra [ and ] characters. The JSON parser in (MultiLine)JsonExtractor does not care for the [, it interprets it as an array start and just moves on and looks for the array members. It will find the JSON object in the first line, and after that, it will run out of the input and stops processing, not concerned with the missing ]. The last row starts with a JSON object, then the parser will hit the ], a token that is invalid in this context (there must be a [ read before). In this case, the JSON parser throws. To avoid that, we should stop processing the input after reading one JSON document. Remember, this behavior can be configured for the (MultiLine)JsonExtractor.

Let's sum up the extraction problems with Wikidata dump and the solutions.

  • parallel processing of lines -> MJE handles this out-of-the-box, we just have to specify the line delimiter. The default line delimiter encoding is UTF-8.
  • superfluous characters in the first/last line -> configure MJE to stop processing after one JSON document parsed
  • output rows larger than 128 kB -> use byte array typed columns and configure MJE to use the Mrys-json-branch extractor mode a.k.a. BytesString mode

We have all the configuration of the MJE for Wikidata dump processing. As it is not a built-in extractor, we have to build a .NET assembly containing it, then register the assembly in ADLA before we can configure MJE. Just follow through the Building and Deploying sections of the extractor documentation.

Now we finally get to write our query. In order to use the extractor we have to reference its assembly first, then its namespace. For the EXTRACT part, we supply the ingredients: the input file to the FROM clause, byte array columns to hold the compressed values of the top level JSON fragments and the configured extractor to the EXTRACT clause.

REFERENCE ASSEMBLY [Newtonsoft.Json];  
REFERENCE ASSEMBLY [Microsoft.Analytics.Samples.Formats];

USING Microsoft.Analytics.Samples.Formats.Json;

@RawData =
EXTRACT  
 [id] string
,[type] string
,[labels] byte[]
,[descriptions] byte[]
,[aliases] byte[]
,[claims] byte[]
,[sitelinks] byte[]
FROM "/wikidata-json-dump/wikidata-20180122-all.json"  
USING new MultiLineJsonExtractor(  
  linedelim:",\n"
  ,numOfDocsPerLine:1
  ,byteArrayProjectionMode:
      JsonExtractor.ByteArrayProjectionMode.BytesString);

Note that we could just project to a single byte[] column, but in this way, we do not need to parse again to extract the id or the type.

We now have the entire Wikidata data in the rowset named RawData. We could then process this raw data further in the processing phase, but what if we store this extracted form to be the input of all further Wikidata queries? In this way, we do not need to rerun the extraction. Let's do exactly that: skip the processing phase and jump right to the output phase.

Processing phase - skipped!

Just said, there is no processing phase this time!

Output phase - replaced!

U-SQL provides two ways to persist the results of a rowset: write to a file (TXT, CSV, JSON, XML) or insert into a U-SQL table. But if we store the rowset in a file, we have to design the output file structure (number and format of the files, etc.) and for every query, we have to use an extractor. The other option? U-SQL offers a big data optimized data structure that abstracts away the file handling pains by offering convenient, easy-to-query logical data model: meet U-SQL tables.

U-SQL tables

You can look (at first) at a U-SQL table as the big data optimized flavor of the standard relational (SQL) table. Some peculiarities of the U-SQL tables:

  • If you want to insert data into the table (duh!), you must specify at least one clustered index.
  • If you specify an index, you must specify the partitioning/distribution scheme.
  • The U-SQL table data is inherently immutable. You can alter the schema of the table, but not the data - except inserting and truncating. Truncate flushes all the data in the table.

Behind the scenes, the table is backed up by a bunch of ADLS files. The structure of the file storage is determined by the index and partition specification. The query performance is in turn determined by the storage structure as it will govern the parallel execution model of U-SQL. For more information on table partitioning, please refer to de documentation.

U-SQL tables can be used as inputs and/or outputs in U-SQL scripts. With U-SQL tables considered, our (now not so basic) flow of U-SQL will get new alternatives:

  1. EXTRACT/query table phase: extract step extracts rowset from the input file. Or queries from a U-SQL table.
  2. PROCESSING phase: a pipeline of processing steps calculate the result.
  3. OUTPUT/insert into table phase: output step writes results to the output file. Or inserts to a U-SQL table.

Based on this, our query flow will look like this:

  1. EXTRACT/query table phase: extracting from Wikidata JSON dump using an extractor
  2. PROCESSING phase
  3. PROFIT OUTPUT/insert into table phase: INSERT to a U-SQL table

Output to U-SQL table

We can just create a table directly, based on the rowset created in the EXTRACT phase - there is a syntax for that - but let's design our table from scratch. We need three main ingredients:

  • Column specifications: this is easy, as we want to store the entire RawData rowset, we use its column specifications
  • Index: remember, we need at least one clustered index. We do not know the characteristics of the query workload in advance, so we follow the rule of thumb here: index by the primary key - in this case, the id column
  • Partitioning: for similar reasons as above, we follow the simplest generic option: no partitions, only distributing rows to buckets (a.k.a. table distribution) based on the hash value of the unique identifier.

Let's assemble the ingredients to get the CREATE TABLE script. Run this script in a separate query window. Do not forget to select your ADLA account and your database in your IDE. You can set the number of AUs to 1-5 for this query - more on this number later.

CREATE TABLE dbo.WikidataImport  
(
     [Id] string
    ,[Type] string
    ,[Labels] byte[]
    ,[Descriptions] byte[]
    ,[Aliases] byte[]
    ,[Claims] byte[]
    ,[Sitelinks] byte[]
    ,INDEX cIX_ID CLUSTERED(Id ASC) DISTRIBUTED BY HASH(Id)
);

The output phase is now very straightforward if you are familiar with the classic SQL's INSERT INTO ... SELECT syntax:

INSERT INTO WikidataImport  
SELECT [id],  
       [type],
       [labels],
       [descriptions],
       [aliases],
       [claims],
       [sitelinks]
FROM @RawData;  

We now have the full script.

REFERENCE ASSEMBLY [Newtonsoft.Json];  
REFERENCE ASSEMBLY [Microsoft.Analytics.Samples.Formats];

USING Microsoft.Analytics.Samples.Formats.Json;

@RawData =
EXTRACT  
 [id] string
,[type] string
,[labels] byte[]
,[descriptions] byte[]
,[aliases] byte[]
,[claims] byte[]
,[sitelinks] byte[]
FROM "/wikidata-json-dump/wikidata-20180122-all.json"  
USING new MultiLineJsonExtractor(  
  linedelim:",\n"
  ,numOfDocsPerLine:1
  ,byteArrayProjectionMode:
      JsonExtractor.ByteArrayProjectionMode.BytesString);

INSERT INTO WikidataImport  
SELECT [id],  
       [type],
       [labels],
       [descriptions],
       [aliases],
       [claims],
       [sitelinks]
FROM @RawData;  

Executing in parallel - Data Lake Analytics Units

This is a massively parallelizable procedure - thanks to MJE, the input can be split along the line delimiters and parsed independently - hence, the execution can benefit from a higher number of parallel processing units. The parallelism of U-SQL is governed by a simple number, the number of so-called Analytics Units (AUs). AUs represent units of processing power that processing units, the ADLA vertices can utilize. The main concepts and some of the details are laid out in this blog post, here we just cite the main points:

When a vertex is run, it uses the compute resources of 1 AU. If a developer specifies N AUs to be allocated to the job, in effect this means that a maximum of N vertices can run at any given moment.

Consider a job with many vertices but that is allocated only 1 AU. This results in only one active vertex at any given point.

Consider a job that has 10 vertices but is allocated 100 AUs. The job is considered "over-allocated"- because it could in theory only ever use 10 vertices at any given moment - so 90 AUs are being wasted.

...

A U-SQL job is billed by the number of AUs seconds it consumes.

The blog post above gives some guidelines for estimating the optimal number of AUs. This is not a simple topic, you have to balance between cost (summarized utilization time of AUs), utilization ratio and execution time. Fortunately, the Data Lake tooling gives an analyzer that can estimate these metrics for different AU numbers - you only have to run the query once to collect profiling data. We have done that already (with 40 AUs) - here are some results:
ADLA AU profiles

As you can see, the baseline 40 AUs gives a great utilization ratio (97%) and ~24 mins runtime. In exchange for some extra cost (159 AUs, +26% AU hours), we would get significantly lower execution time and slightly lower utilization ratio (7.4 min, 75%). If your priority is the execution time, you can allocate ~400 AUs, that will give you 6 mins execution time, but it will cost you more than two and a half more (+256% AU hours) than the 40 AUs version.

Based on these stats and your cost/time restrictions, you can choose your AU number. Let's run this query already! If you are using Visual Studio, you can monitor the progress on the Job Graph in the Job Browser window.

Job graph of the query

Check what have we done - sampling a table

Sorry, but there is no direct way to peek into a table. However, we can craft a query in no time, that will select some random rows from the table and write it to a file. The phases will look like this:

  1. EXTRACT/query table phase: query item rows from the import table.
  2. PROCESSING phase: sampling the rowset from the previous phase.
  3. OUTPUT/insert into table phase: write the rowset from the previous phase to a (CSV) file.

After this careful planning, the U-SQL code is rather simple. There is a sampling operator built-in in U-SQL. We just specify the sampler ANY and the number of rows we want to get, and it chooses the number of rows specified - arbitrarily.

@WDFilter=
SELECT  
 [Id]
,[Type]
,[Labels]
,[Descriptions]
,[Aliases]
,[Claims]
,[Sitelinks]
FROM WikidataImport  
WHERE Type=="item";

@JSONSample=
SELECT  
 [Id]
,[Type]
,[Labels]
,[Descriptions]
,[Aliases]
,[Claims]
,[Sitelinks]
FROM @WDFilter  
SAMPLE ANY (20);

OUTPUT @JSONSample  
TO "/wikidata-json-dump/wikidata-table.csv"  
USING Outputters.Csv(outputHeader:true,quoting:true);  

This is a very simple, low-cost query, you need only a minimum number of AUs to run. After the job is completed, check the output CSV on the Data tab of the Job Browser window.

Preview of sample CSV

The byte arrays are encoded in hex format, you can examine them easily using online decompressing tools, like CyberChef. A sample CyberChef pipeline is available here.

Next time

We have all the Wikidata data in our U-SQL table. The data is still in a rather crude form (compressed JSON fragments in columns), but as you will see, even over this structure, we can run ad-hoc analytical queries - thanks to the mighty power of U-SQL!

Stay tuned!

Acknowledgements

Thanks to

  • moczard for his contribution to the enhanced JsonExtractor and the MultilineJsonExtractor
  • kovacstibor for proofreading

Azure ADF icon