Home » Technology » Using Elasticsearch as the Primary Data Store in an ETL Pipeline

Using Elasticsearch as the Primary Data Store in an ETL Pipeline

The biggest e-commerce company in the Netherlands and Belgium,
bol.com, set out on a 4 year journey to rethink and rebuild
their entire ETL (Extract, Transform, Load)
pipeline, that has been cooking up the data used by its search engine since
the dawn of time. This more than a decade old white-bearded giant, breathing
in the dungeons of shady Oracle PL/SQL hacks, was in a state of decay, causing
ever increasing hiccups on production. A rewrite was inevitable. After
drafting many blueprints, we went for a Java service backed by Elasticsearch
as the primary storage!
This idea brought shivers to even the most senior
Elasticsearch consultants hired, so to ease your mind I’ll walk you through
why we took such a radical approach and how we managed to escape our legacy.

Before diving into the details, let me share a 2,000ft overview of an
e-commerce search setup that will help you to gain a better understanding of
the subjects discussed onwards. Note that this simplification totally omits a
nebula of incorporated caching layers, systems orchestrating multiple search
clusters, queues with custom flush and replay functionalities, in-place
resiliency mechanisms, services maintaining deprecated search entities to
avoid getting ranked down by bots due to 404s, circuit breakers, throttlers,
load balancers, etc. But it is still accurate enough to convey the general
idea.

Architecture Overview

[Before going any further, I want to take this opportunity to align you on
what exactly I do mean by search. I hope this will help you to better wrap
your mind around the ultimate consumer of ETL. That being said, feel free to
skip this section and directly jump to the ETL deep dive in the next
section.]

Many people tend to make the mistake of having a narrow view on search at
e-commerce and confining its use case to a mere term scavenging in a
mountainous stack of product attributes. While this statement holds to a
certain extent, it resembles a cherry located at the tip of an iceberg. (In
Varnishing Search Performance
presentation, I tried to summarize how difficult it can get just to add a
caching layer between your search logic and backend.) There are books written,
university lectures offered, and computer science branches dedicated on the
matter. But let me try to briefly elaborate this from an engineering
standpoint.

What is search anyway?

If I would try to give a general, but far from complete, overview, it enables
one to

  • search for a term in hundreds of product attributes, where matching
    and ranking are curated with directly or indirectly available consumer
    (are you a PS4 owner searching for the newest “Call of Duty”?) and
    relevance (you probably meant a band by typing “The Doors”, which is
    irrelevant for “Doors & Windows” department) contexts,

  • browse (basically a search without a term) in thousands of categories
    with similar ranking mechanics used in search aforementioned,

  • beam up directly to a certain product or category given the input matches
    with certain patterns (EAN, ISBN, ISSN, etc.) or merchandising rules (any
    syntactic and/or semantic combination of “wine glasses” should end the
    flow in a particular department, etc.),

  • implicitly trigger multiple searches under the hood (e.g. narrowing
    down to a lower category or widening up to a higher category, etc.)
    to enhance the results,

  • and decorate every listing with faceting (you probably want to see
    “Capacity” facet rather than “Shoe Size” while searching/browsing in
    “Harddisks”) support.

This is a big debate. But I know a handful of certain consumers:

  • Customers: People who search and buy goods. They look harmless, until
    one gets exposed to them on a Black Friday
    where they work hand to hand in masses to DDoS
    the entire infrastructure.

  • Bots: They periodically (a couple of times a day at most, as of the
    date of this writing) try to digest your entire catalog into their system
    for two main purposes:

    • Integrate the catalog into their own search engine (that is, Google),
    • Tune their pricing strategy (that is, competitors)

    The worst part of handling bot traffic is you cannot always throttle them
    (for instance, Google takes into account website latencies for rankings) and
    you need to make sure they do not harm the customer traffic. Food for
    thought: Imagine your customers swarming at your shop at Christmas Eve
    and Google decided to spider your entire catalog with thousands of requests
    per second.

  • Partners: Your business partners can also scan your catalog
    periodically to integrate into their own systems. (Fun fact: Some even
    require a daily Excel export.) One can classify them as bots only
    interested in a subset of the data.

  • Internal services: Last time I counted, there were 20+ internal
    services using search to enhance their results in addition to the
    users I listed above. Their usage can constitute up to 50% of the
    traffic.

In the case of partners and internal services, one might argue why do they
need the search data rather than directly accessing the raw product attributes
and offers. The answer is simple: They also use additional attributes (e.g.,
facets, categories) incorporated at the ETL pipeline. Hence, rather than
exposing the internal ETL system to them, it is more convenient to manage them
at the search gateway which is known to have battle-tested scalability and
resiliency measures.

What about performance?

As decades-long experience in this domain points, making search 10ms faster
can yield millions of euros extra revenue depending on the scale of your
business. Unfortunately, this equation works the other way around as well.
Hence, you are always expected to perform under a certain latency and above a
certain throughput threshold.

How volatile is the content?

Very, very, very volatile! I cannot emphasize this enough and I believe this
is a crucial difference that puts e-commerce search apart from Google-like
search engines – recall the conflict between Google and Twitter for indexing
tweets. Maybe examples can help to convey the idea better:

  • A product might have multiple offers (bol.com offer, partner offer, etc.)
    featuring varying properties (pricing, deliverability, discounts, etc.)
    where both offers and/or their properties are highly volatile. The offer
    might run out of stock, the price might change, etc. While customer-facing
    web pages are enhanced with the most recent data at runtime, search index
    might lag behind and provide an eventually consistent view. The volatility
    in this context might range from seconds to months. On prime time, e.g. on
    Valentine’s Day, you don’t want your search engine to return gift listings
    that ran out of stock a couple of seconds ago.

  • Your manual (triggered by shop specialists) and automated (artificial
    intelligence, machine learning driven) processes can alter the category
    tree, add new facets, tune the exposure of existing facets, modify the
    search behavior (e.g., flows triggered by merchandising rules), add context
    sensitive (e.g. category-dependent) thesaurus entries, synonyms, introduce
    new rankings, etc. These changes might necessitate the update of millions of
    documents retroactively.

This volatility debate will take a prominent role while deciding on the
architecture of the next ETL pipeline, which I will elaborate in a minute.

In the domain of search at e-commerce,
ETL denotes the
pipeline where the input is a multitude of information sources (product
attributes, offers, discounts, rankings, facets, synonyms, thesaurus entries,
etc.) and the output is the
denormalized input
constituting search-ready documents optimized for search query performance.
Wait a second? If an ETL pipeline just delivers some optimization purposes,
doesn’t this sound like that one can have a search without it? Sorta… That
is indeed possible to a certain extent. If we would put the details aside for
a moment, we can roughly compare the two approaches as follows:

Strategy Advantages Disadvantages
Without ETL Every change in the input sources take immediate effect. (Hence, almost
zero index time cost.)
Latency and throughput hurts dramatically due to necessitated join and
enrich operations on input sources at query time.
With ETL Since all potential data to satisfy search requests has already
been baked into the index, search necessitates the least amount of
effort to satisfy a request at query time.
Every change in the input sources will necessitate pre-processing
affecting a multitude of products ranging from a couple to millions.

Put another way, ETL is all about the trade-off between index- versus
query-time performance. In the light of all these and given

  1. our existing ETL was functionally comprehensive enough,
  2. query time performance of Elasticsearch has already been suffering due to
    faceting, internally triggered queries, etc. to an extent external caching
    becomes a necessity,
  3. and search latency has a big impact on the revenue,

we took the thick ETL pipeline path.

But what is this ETL pipeline really? What does it literally do? In order to
answer these questions, let me focus your attention to the input sources going
into the ETL pipeline:

ETL Input Sources

[GPC stands for Global Product Classification,
which is de facto commercial categorization of goods varying from a car to
a litre of milk.]

These two input sources, content and configuration, feature two totally
different execution patterns framing the functional requirements of the
potential ETL solutions, hence, play the uttermost critical role in justifying
the plan we picked. Let’s examine them further:

Real-time Content Stream

Here the ETL pipeline listens from more than a dozen queues for updates
ranging from product attributes to offers, offer-specific discounts to
rankings, etc. all formatted in JSON. Fortunately, each
real-time content stream message triggers a single product update. Let me
exemplify this with a case: when disk_capacity_bytes attribute of a product
changes, we

  1. first fetch the relevant document from the storage,
  2. update its disk_capacity_bytes attribute,
  3. apply configuration(s) matching with the last state of the updated document,
  4. and persist the obtained result back.

There are some concerns need to be addressed here:

  • This is a pretty CPU intensive operation. Configurations, in essence, are
    rules in the form of (predicate, mutation) pairs defined via
    business-friendly screens by shop specialists. When an attribute of a
    document gets updated, this change might be of interest to many
    configurations which are determined by performing an inverse lookup on tens
    of thousands of configuration predicates (e.g., attrs.disk_capacity_bytes !=
    null
    ) matching with the last state of the document. Later on mutations
    (e.g., doc.disk_capacity_gigabytes = attrs.disk_capacity_bytes / 1e9) of
    the found configurations are executed to let them shape the document
    according to their needs.

    This innocent looking procedure sneakily introduces two critical issues
    under the hood:

    1. How would you represent the configuration predicate such that you can
      match them against the content?
    2. How would you represent the configuration mutation such that you can
      execute them against the content?

    And it goes without saying, both concerns aforementioned need to be
    engineered efficiently. You are expected to repeat this procedure on each
    message JSON of the real-time content stream where the traffic is in the
    order of millions per day.

    As a concrete configuration example consider the following: You have two
    “Disk Capacity” facets defined by business: one for computers, one for
    smart phones departments. The first one translates the
    disk_capacity_bytes into a disk_capacity_terabytes attribute which is
    defined to be exposed when category == "computers" and the second
    translates into a disk_capacity_gigabytes attribute which is defined to
    be exposed when category == "smart phones". Here both configurations are
    executed when the attrs.disk_capacity_bytes != null predicate holds.

  • This operation needs to be performed atomically. Two concurrent operations
    touching to the same product should not result in a corrupt content.

Configuration Stream

Configurations are the rules defined via business-friendly screens. There
modifications done by shop specialists are published in snapshots when they
think the changes grow into a stable state that they are ready to be exposed
to the customer. Each published configuration snapshot ends up serving three
purposes:

  1. search gateway uses it to determine how to query the search index,
  2. ETL pipeline uses it to process the real-time content stream,
  3. and ETL pipeline retroactively updates the documents that are potentially
    affected.

While the first two are relatively cheap operations, the last one is the
elephant in the room! This is the first time in our beautiful tale described
so far that we need to propagate a change to millions of documents. Let me
further explain this in an example:

Let’s consider that the following category definition:

if (attrs.gpc.family_id == 1234 && attrs.gpc.chunk_id == 5678) 
  doc.category = "books"

is modified as follows:

if (attrs.gpc.family_id == 1234 && attrs.gpc.chunk_id == 0xDEADBEEF) 
  doc.category = "AWESOME BOOKS"

Sir, you are in trouble! As the very ETL pipeline, what you are expected to
deliver is to

  1. find products that are matching with the old predicate,
  2. revert the changes of the old configuration mutation by removing books from the category field,
  3. find products that are matching with the new predicate,
  4. and apply the changes of the new configuration mutation by adding AWESOME BOOKS to the category field.

This easier said than done operation contains many implicit concerns:

  • ETL needs to avoid removing books from the category field if there are
    rules, other than the changed one, adding books to the very same category
    field. There are two ways you can approach to this:

    1. With every value added to a field, store a meta information pointing
      to the rules associated with that value. These back-tracking pointers
      optimize the check whether a value can be removed or not, with the cost
      of maintaining them in an ocean of values.

    2. After removing every value, put the product back into the ETL pipeline
      just like handling products in the real-time content stream. If there are
      any rules, other than the changed one, adding books to the very same
      category field, they will kick in. This simple approach comes with the
      cost of a CPU intensive and unfortunately mostly redundant processing.

  • Given that configuration predicates are allowed to access any field, how
    would one represent a predicate and translate this into an ETL storage query
    filter that performs well? (You would not want to scan the whole data set
    for each predicate that is changed, right? Well… depends.)

    Let’s first discuss the representation of predicates issue, which was also a
    concern in the real-time content stream processing. Here you might first
    fall into the trap of whitelisting the operators (==, !=, >, >=,
    <, <=, ~=) and the content attributes (attrs.gpc.family_id,
    attrs.gpc.chunk_id, attrs.disk_capacity_bytes, etc.) that are allowed in
    configuration predicates. While whitelisting operators is fine, whitelisting
    the content attributes implies that the ETL pipeline, the configuration
    administration GUIs, etc. all needs to have the knowledge of this whitelist
    which strictly depends on the structure of the real-time content stream
    message structures. Whenever the message structures change or you want to
    add a new attribute to this whitelist, both happen a couple of times every
    year, you need to propagate this to many components in your service milky
    way and perform a deploy without downtime.

    What about translating these predicate representations into efficient ETL
    storage query filters? Let’s take the simplest approach: Represent each
    attribute with a separate field. Then let me ask you the following
    questions:

    1. If you would opt for using an RDBMS, you can represent attributes by
      columns and create an index for each individual column. (Ouch!) Thanks to
      the half-century battle-tested RDBMS literature, the database can easily
      optimize and perform an index scan for the constructed queries:

      SELECT ...
        FROM content
       WHERE attrs_gpc_family_id = '1234'
         AND attrs_gpc_chunk_id = '5678'

      That being said… What if you hit to the maximum column count limitation?
      (Yes, we did!) Further, what about attributes that are list of objects:

      
        "authors": [
          
            "fname": "Volkan",
            "lname": "Yazici"
          ,
          
            "fname": "Lourens",
            "lname": "Heijs"
          
        ]
      

      You definitely cannot store these in a single column and still query each
      individual component. Ok, then you can normalize the data as follows:

      SELECT ...
        FROM content,
             attribute AS a1,
             attribute AS a2
       WHERE a1.content_id = content.id AND a1.name = 'gpc_family_id' AND a1.value = '1234'
         AND a2.content_id = content.id AND a2.name = 'gpc_chunk_id'  AND a2.value = '5678'

      So far so good. But… In a matter of months, you will need to start
      partitioning tables and maybe even move certain partitions into separate
      database instances to maintain the latency under a certain threshold.
      (Yes, we did this as well!) But this never-ending database structure
      optimization more and more feels like you are inventing your own
      distributed database using a plain RDBMS. Does this really still need to
      be this way in 2018?

    2. If you would opt for using MongoDB, like using
      an RDBMS, you still need create an explicit index on each (whitelisted)
      field. For filters involving multiple fields (e.g., attrs.gpc.family_id
      == 1234 && attrs.gpc.chunk_id == 5678
      ), MongoDB query optimizer can
      purpose individual field indices via index intersection.
      That being said, our experience with this feature has not been very
      pleasant.

      The issue where attributes might contain list of objects is
      not a problem for MongoDB.

    3. If you would opt for Google Cloud Datastore,
      you will need to create explicit indices for each potential filter
      combination and order matters! Yes, you read that right! Let me exemplify
      this bizarre situation. If you have configurations with the following
      predicates:

      • attrs.gpc.family_id == 1234
      • attrs.gpc.chunk_id == 5678
      • attrs.gpc.family_id == 1234 && attrs.gpc.chunk_id == 5678
      • attrs.gpc.chunk_id == 5678 && attrs.gpc.family_id == 1234

      you need to define 4 different indices! Ouch! This in its own was a
      Datastore show stopper for us.

    4. If you would opt for Elasticsearch,
      all fields are indexed by default and you can use them in any combination!
      Yay! No need for whitelisting! And similar to MongoDB, Elasticsearch also
      allows querying list of objects,
      you just need to declare them explicitly as nested. If you don’t even
      want to worry about that, you can add a dynamic mapping template to make
      each object nested by default. Following is the index mapping you can use
      for that purpose:

      
        "date_detection": false,
        "dynamic_templates": [
          
            "strings": 
              "match_mapping_type": "string",
              "mapping": 
                "type": "keyword"
              
            
          ,
          
            "objects": 
              "match_mapping_type": "object",
              "mapping": 
                "type": "nested"
              
            
          
        ]
      

      Above mapping also disables analyzing the fields of type string, since
      we are not interested in performing fuzzy queries. Clearly, date detection
      is disabled for similar reasons.

      These being said, Elasticsearch is known to suffer from deteriorating
      query performance over time when exposed to high update rates.

So far we examined the current ETL setup with concrete examples for several
cases. We broke down the system into its individual input sources and detailed
their implications on certain architectural decisions. Let’s wrap up this
mind-boggling details into operational abstractions:

The ETL: Operational Overview

Given these operational abstractions, let me summarize the constraints the
configuration components (predicate and mutation) imply.

Configuration Mutations

If you would recall, configuration mutations were simple document enhancement
instructions that I exemplified as follows:

doc.category = "books"

Here doc is a dictionary denoting the ETL’ed document source and mutation
“adds” books value to its category field. This (for simplification
purposes, JavaScript-employed) innocent looking expression can (and does!) go
to unintended extents:

if (attrs.suitable_for_month <= 2) 
  doc.childhood_stage = "newborn";
 else if (attrs.suitable_for_month <= 12) 
  doc.childhood_stage = "infant";
 else if (attrs.suitable_for_month <= 48) 
  doc.childhood_stage = "toddler";

The choice of the mutation DSL
employed is expected to deliver the following requirements:

  • It “must” support JSON input and output for the real-time content stream.
    (See step B4 in the figure.)
  • It “should” support ETL storage input and output for the configuration
    snapshot stream. (See step A4 in the figure.)

The reason that the latter functionality marked as optional is that the ETL
pipeline can also retrieve these documents in raw from the storage, convert
them to JSON, execute mutations, and persist them back again – assuming data
integrity is provided by other means, e.g., transactions, retries powered by
compare-and-swap operations, etc.

Configuration Predicates

Configuration predicates were simple conditions restricted to use a
whitelisted set of operators (==, !=, >, >=, <, <=, ~=)
supporting grouping:

attrs.gpc.family_id == 1234 && attrs.gpc.chunk_id == 5678

Similar to mutations, the choice of the predicate DSL used is expected to
deliver the following requirements:

  • It “must” support JSON input for the real-time content stream. (See step B2
    in the figure.)
  • It “should” support ETL storage input for determining the affected documents
    by the configuration snapshot delta. (See step A4 in the figure.)

We relaxed the latter constraint since one can very well prefer to put the
entire stored document collection (Ouch!) back into the ETL pipeline, process
them, detect the changed ones, and persist the updates. This approach has
certain assumptions though:

  • We don’t need to perform this too often. That is, the frequency of
    configuration snapshots are relatively low, e.g., max. a couple of times a
    day.
  • The snapshot deltas affect a significant percentage of the entire collection
    to an extent that the advantage of finding and processing only the affected
    documents diminishes.

Given you still need to make a back of the envelope calculation on your cloud
bill for each approach, our years of statistics in the ETL snapshot
configuration point that most of the time snapshot deltas affect at most 5% of
the entire collection and the average is less than 1% – thanks to the
incremental updates carried out by shop specialists. Hence, performing a
complete ETL a couple of times a day feels like overkill and hurts the
engineer within you.

The old ETL was a single Oracle database where the configurations were modeled
in PL/SQL. Since the configuration abstraction language was the very same
language the database uses itself, executing mutations and predicates was
effortless. Hail SQL injection
as a feature! Though this came with some notable costs:

  • Using PL/SQL within the abstraction model created both functional and
    financial vendor lock-in. The functional deficiency (incompetent
    expressiveness, leakage of PL/SQL to irrelevant components) obstructed many
    innovations over the years, where it became more and more difficult as time
    passed. Additionally, it constituted a significant obstacle for migrating
    the service to the cloud. Its financial aspect was negligible at the scale
    of bol.com.
  • Rolling back changes of an updated configuration mutation is quite a
    PL/SQL engineering endeavor to implement in practice. This difficulty,
    spiced up with the insufficient logging, testing, debugging, profiling, etc.
    utilities, drew programmers back from taking this path. Hence, there was a
    12+ hours long complete ETL run every night for configuration snapshot
    deltas.
    This beast tamed by an experienced couple of engineers has a
    reputation to have frequent hiccups and make bugs really difficult to debug,
    find, and reproduce, let alone fix!

In its previous incarnation, the content attributes were stored in <id,
content_id, key, value>
normalized form. This approach started to suffer from
efficiency aches in the hinges pulling the ETL’ed data to the search index.
Back then hired Oracle consultants examined the usage and recommended to go
with a denormalized structure where each attribute is stored as a column. In
addition to temporarily bandaging up the efficiency related wounds, this
allowed DBAs to let their imaginations go wild to map the attributes to
columns. Recall the attributes composed of objects I mentioned above? Special
characters were used to create such multi-value attributes, which was pretty
much (to put it mildly) unpleasant. But the killer bullet came in the form of
a six-inch punch referred as the maximum allowed column count
limit
. But isn’t engineering
all about how hard you can get it and keep moving
forward
? Yes, comrade! We thought so and used a
single binary XML column to store attributes, queried them using Oracle XPath
toolbox, escaped attribute values, finally concatenated them into SQL strings
that are eventually executed, and for sure crossed our fingers.

There are a couple of important details that I could not manage to cover in
the above war diary without spoiling the coherency. Let me drop them here in
no particular order:

  • Task parallelization is pretty difficult in PL/SQL. We tried patching this
    hole via internal Oracle AQs, but I am not really sure whether it improved
    or worsened the state.
  • In a database procedure that is expected to run for 12+ hours, Murphy’s law
    works flawlessly. Anything that can go wrong, did, does, and will go wrong.
    We wisely(!) engineered the system to persist its state at certain check
    points constituting retriable handles to invoke when you come in the morning
    and see that the ETL crashed.
  • The number of moving components necessitated the use of a proprietary
    scheduling tool supporting Oracle
    . The schedule
    was glued with bash scripts, designed
    in a proprietary development environment only available for Windows, and
    rolled out on Oracle machines running GNU/Linux. Neither GNU/Linux, nor
    Windows using developers were fond of this situation.
  • Due to the high cost of a failing ETL, business also did not feel empowered
    to change and/or commercially optimize it easily. This was a pretty
    demotivating issue affecting both technical and business people need to work
    with it.

Enough blaming the former engineer. We need to get our facts right. The
aforementioned PL/SQL giant was not rolled out in a day with a big bang. This
more than a decade old ETL pipeline was developed with all the best practices
and tooling available back then. The more you dive into its source code,
navigate through commits of features spanning through years, it becomes easier
to see what went wrong and where. Now you are able to realize the patterns
that necessitated exceptional handling of certain features, of which many due
to backward-compatibility with legacy systems that have already been
deprecated or replaced by newcomers, exploded the complexity to unintended
depths. Software development is never-ending progress and axioms you base your
initial architecture on become invalidated in the course of time due to
changing business needs. Aiming for infinite flexibility comes with an
engineering cost as well, which might very well fall short of justifying such
an expense. One should also include the massive burst of data volume and its
update frequency into this list. I personally think the old ETL pipeline and
its engineers did a fantastic job. The tool served its purpose for more than a
decade and harvested an immense amount of lessons for its successor. I would
be more than happy if we as a team can also achieve to deliver such a long
living product.

Given our functional requirements, we evaluated a couple of different ETL
pipeline storage solutions which I hinted to earlier.
Following is the feature matrix of each candidate:

Storage Solution Distributed Sharded Required Indices Integrity Measure
PostgreSQL No No One1 Transactions
PostgreSQL (partitioned) No Yes2 One1 Transactions
MongoDB Yes Yes3 Some4 Compare-and-swap5
Elasticsearch Yes Yes None Compare-and-swap6

1 PostgreSQL jsonb index covers all fields.
2 PostgreSQL partitioning is not sharding in distributed sense, but still serves a similar purpose.
3 MongoDB sharding requires manual configuration.
4 MongoDB requires an explicit index for each whitelisted field allowed in ETL configuration predicates.
5 MongoDB updateMany() or findAndModify() can be leveraged for the desired integrity.
6 Elasticsearch _version field can be leveraged to implement a compare-and-swap loop.

Benchmark Setup

For the benchmark, we populated each store with 33 million JSON documents of
which each weighs an average size of 2.5KB. One of the contrived fields in the
document is search_rank. Later on, a file consisting of 6 million distinct
<id, search_rank> pairs is streamed in batches of size 1000. For each batch,
we first fetch the old search_ranks associated with the ids and then bulk
update these with the new search_ranks. In this scenario, what we tried to
emulate is a bulk update triggered by a configuration snapshot delta, which
is the most storage performance demanding operation in the ETL pipeline.

Used test bed is a cluster composed of 6 dedicated machines with the following
specifications:

  • CPU: 16 core Intel Xeon E5-2620 v4 @ 2.10GHz
  • Memory/Swap: 128GB/16GB
  • Disk: 375GB (Intel P4800X Performance NVMe PCIe SSD)
  • Kernel: 3.10.0-693.1.1.el7.x86_64

We further configured each store as follows:

  • PostgreSQL: Just one PostgreSQL 9.6.10 instance
    (shared_buffers = 128MB) containing a single <id, content> table where
    content is of type jsonb.
    Benchmark configured to update only the search_rank attribute of the
    content column.

  • PostgreSQL (partitioned): Same as above, but the content table is
    partitioned into 10 tables.

  • MongoDB: MongoDB 3.6 with the following configurations:

    systemLog.destination: file
    systemLog.logAppend: true
    processManagement.fork: true
    storage.engine: wiredTiger
    security.authorization: enabled
    replication.oplogSizeMB: 9216

    Note that sharding is not enabled. (More on this later.)

    Similar to PostgreSQL setup, benchmark configured to update only the
    search_rank attribute of documents.

  • Elasticsearch: Elasticsearch 6.3.0 with the following JVM flags:

    -Xms30g
    -Xmx30g
    -Xss256k
    -XX:NewRatio=3
    -XX:+UseParNewGC
    -XX:+UseConcMarkSweepGC
    -XX:CMSInitiatingOccupancyFraction=75
    -XX:+UseCMSInitiatingOccupancyOnly
    -XX:+PrintGCDetails
    -XX:+PrintGCDateStamps
    -XX:+PrintClassHistogram
    -XX:+PrintTenuringDistribution
    -XX:+PrintGCApplicationStoppedTime
    

    Here JVM heap size is set to 30G due to
    compressed OOPs limitation.

    Different from PostgreSQL and MongoDB setups, where only the search_rank
    attribute is updated, Elasticsearch benchmark is configured to update the
    entire document. While this overkill is subject to hammer Elasticsearch way
    heavier (since Elasticsearch will create quite some garbage segments waiting
    to be merged and making every object nested worsens the case even more) than
    other stores, it is more strategically aligned with how we want to use it in
    the future.

Benchmark Results

Below you will see the results of the benchmark for only MongoDB and
Elasticsearch. The reason PostgreSQL results were omitted is no matter what
kind of optimization we throw at it, the benchmark always took more than 2
hours, regardless of partitioning, whereas MongoDB and Elasticsearch took a
couple of minutes.

Store Conc.7 Latency Total (s) Fetch8 75% (ms) Fetch8 99% (ms) Fetch8 Max. (ms) Update9 75% (ms) Update9 99% (ms) Update9 Max. (ms)
MongoDB 8 total 518 68 999 3380 64 2347 4153
per batch 8 125 423 8 293 519
16 total 526 71 3082 7905 68 5564 7955
per batch 4 193 494 4 348 497
32 total 518 61 6668 11465 98 10533 13784
per batch 2 208 358 3 329 431
Elasticsearch 8 total 251 278 423 798 94 186 412
per batch 35 53 100 12 23 52
16 total 196 478 697 1004 141 266 410
per batch 30 44 63 9 17 26
32 total 175 951 1368 1515 214 331 828
per batch 30 43 47 7 10 26

7 Number of concurrent batches.
8 Time it takes to fetch a batch.
9 Time it takes to update a batch.

Let me share some observations from the results:

  • Increasing concurrency improves Elasticsearch performance (up to 32
    concurrent batches) but does not have much effect on MongoDB.

  • Elasticsearch rocked in performance even though it is hammered with
    the update of the entire document whereas MongoDB is just trying to update a
    single attribute. Using 32 concurrent batches, it took 175s and 518s for
    Elasticsearch and MongoDB, respectively, to complete the benchmark.

  • Elasticsearch yielded way more predictable performance figures compared
    to MongoDB. Note the difference between 75- and 99-percentile figures.

  • Elasticsearch segment merges were unexpectedly pretty stable during
    the runs, whereas we were anticipating it to become the bottleneck due to
    high update rate. But compare-and-swap loops played over _version fields
    allowed for the necessary data integrity without breaking a sweat.

At the time of testing, we initially were not able to enable sharding in
MongoDB due to operational obstacles on our side. Though Elasticsearch results
were such promising, to the point of even shocking the hired Elasticsearch
consultants, we decided to go with it, of which we have years of production
experience. If we would put the necessity of whitelisted configuration
predicate fields problem aside – that is, required explicit indices on what
can be queried – MongoDB could very well be a viable option as well.

But, really, why Elasticsearch has a reputation of not being recommended as a
primary data store? I think it all started when the official project website
years ago contained an explicit statement admitting that Elasticsearch is not
intended to be used as a primary data store. Once, as the very owner of the
project itself, you admit this fact, it is really difficult to convince people
the other way around – even if the situation might have been improved. Later
on, published Jepsen (an effort to improve the safety of
distributed databases, queues, consensus systems, etc.) reports (one in
2014-06-15 using Elasticsearch 1.1.0

and the other one in 2015-04-27 using Elasticsearch
1.5.0
)
worsened the situation and this bad reputation disseminated over the web in
the speed of light. While this tornado DDoS’ing the entire Hackernews,
Proggit, etc. blogosphere with endless discussions in the form of “See? I
told ya so!”
, Elasticsearch team put up a Elasticsearch Resiliency
Status

page. There they started sharing (even up to today!) known resiliency
problems, including the ones found in Jepsen reports, converting them into
reproducable cases in GitHub
issues
, and tackling them
one at a time. What else would qualify as a professional commitment if not
this one? Again, these were all back in early 2015. Our Elasticsearch
production deployments successfully managed to return with a victory from
every battle front thrown at them. It did not always feel like a walk in
the park. We had our hard times, though managed to overcome those and noted
down the experience to the book of lessons learnt. Let me share some common
practices from that collection:

  • Security: Elasticsearch does not provide any means of security
    measures (encryption, etc.) out of the box. We do not use Elasticsearch to
    store any sort of PII.
  • Transactions: Elasticsearch does not have transaction support. Though we
    work around it by performing compare-and-swap loops over the _version
    field.
  • Tooling: Elasticsearch tooling is… just a piece of crap. It doesn’t
    have a proper development environment – you are stuck to running a fully
    blown Kibana just to be able to use its arcane
    Console.
    Its Java client drags in the entire milky way of Elasticsearch artifacts
    as a dependency which is a JAR
    Hell
    time bomb
    waiting to explode. Further, the recently introduced high-level REST
    client

    leaks the Apache HTTP Client API models, etc. For the leaked models
    and transitive dependencies, there is nothing much you can do – you just
    learn to live with them. For IDE, you just keep a thick stack of HTTP
    request recipes using your favorite HTTP client, e.g.,
    cURL, Postman,
    httpie, etc.
  • Documentation: Elasticsearch does not have documentation; PostgreSQL
    has documentation
    , MongoDB has
    documentation
    . What Elasticsearch has is a stack
    of surface-scratching blog posts served in the form of a documentation-like
    website
    .
    Elasticsearch also has an ocean of Stack
    Overflow

    and forum posts where you are
    allowed to swim at your convenience. That being said, one needs to admit that
    situation is improving over the time. (Yes, it was way worse!)
  • Resiliency: Yes, Elasticsearch can crash, just like another piece of
    software. In order to address these emergencies, in addition to hot-standby
    clusters, we take regular snapshots
    and persist the messages processed by the ETL pipeline to a separate storage
    providing efficient write and bulk read operations, e.g., PostgreSQL, Google
    BigQuery, etc. In case of need, we just restore from a snapshot and replay
    the necessary set of messages to recover the lost state.

Is Elasticsearch the perfect tool for the job at hand? Not really. But it is
the one closest to that. We also know how to deal with each other – just like
in any other relationship.

By taking into account the ETL pipeline concerns detailed in previous
chapters, we derived a list of basic foundations that we aim to deliver:

  1. The configuration DSL must be abstract enough to avoid any too
    much vendor lock-in. One must be able to represent configurations in this
    DSL such that applying these on a JSON and/or the underlying storage unit
    must be a matter of writing the necessary adapter classes.
  2. The storage must allow the ETL pipeline to query the entire collection
    using any possible filter combinations allowed by the configuration
    predicate DSL. This is a crucial pillar in the design to enable real-time
    processing of every message, both content and configuration snapshot
    stream, without necessitating an ETL run over the complete collection which
    used to be the case in the old ETL pipeline.

Let me elaborate on how we addressed these deliverables.

The Primary Storage: Elasticsearch

The previous benchmark section already detailed the rationale behind employing
Elasticsearch as the primary storage. It is distributed and sharded by
default. It doesn’t require explicit indices on a whitelist of allowed
configuration predicate fields – every field is allowed to be queried by
default. It has no problems with querying fields containing a list of objects.
It provides sufficient leverage for data integrity via compare-and-swap loops
over _version fields. It is very efficient on bulk fetches and updates,
which was totally unexpected for us. Last, but not least, it is our bread and
butter in search and we have plenty of experience with it.

The Configuration DSL: JSON and Groovy

In the case of configuration DSL, we wanted to stop the plague of PL/SQL
leakage all around the code base. For this purpose, we decided to go with the
model depicted below.

The New Configuration DSL

Here we replaced SQL WHERE clauses, which were used to represent configuration
predicates in the old ETL pipeline, with JSON describing the structure of
the predicate. This new predicate representation resembling the Elasticsearch
filters is translated to individual executors matching against either JSON
(coming from the real-time content stream) or the storage engine, that is,
Elasticsearch. Note that the way we used to represent the predicate is
independent of medium (JSON, Elasticsearch, etc.) it is executed against such
that we even implemented a MongoDB adapter at some point. An example
configuration predicate JSON is show below:


  "type": "nested",
  "path": ["content", "attribute"],
  "filter": 
    "type": "and",
    "filters": [
      
        "type": "nested",
        "path": ["gpc"],
        "filter": 
          "type": "and",
          "filters": [
            
              "type": "equals",
              "path": ["family_id"],
              "value": "1234"
            ,
            
              "type": "equals",
              "path": ["chunk_id"],
              "value": "5678"
            
          ]
        
      ,
      
        "type": "nested",
        "path": ["authors"],
        "filter": 
          "type": "and",
          "filters": [
            
              "type": "equals",
              "path": ["fname"],
              "value": "Volkan"
            ,
            
              "type": "equals",
              "path": ["lname"],
              "value": "Yazici"
            
          ]
        
      
    ]
  

As depicted above, we split the configuration mutation model into two
abstractions: extension and functional extension. An extension is the
simplest form of mutation that generally applies to more than 90% of the
available configurations. It is basically a JSON object that is upon execution
expected to be merged into the original source. A simple example is as
follows:


  "category": "books"

Functional extensions are built to address complex configuration mutations.
There we employed Groovy after experimenting
with some other candidates, e.g., JavaScript (Nashorn, which is planned to be
dropped
), Python
(Jython), Ruby (JRuby),
etc. The main drivers for us to pick Groovy are as follows:

  • It supports direct access to Java data structures (e.g., java.util.Map)
    without any intermediate translations, hence has no problems processing
    thousands of mutations on a single core.
  • It is widely adopted to an extent that in the future we might opt for
    running it against the storage engine.
  • Its runtime performance is on par with the rest of the candidates.

That being said, the decision of Groovy creates a JVM vendor lock-in for the
ETL pipeline, though we do not anticipate this to be a problem for at least
the coming decade.

A sample functional extension is given below.

static Map<String, Object> extend(Map<String, Object> source) 
    def diskCapacityBytes = (long) source.get("disk_capacity_bytes")
    def diskCapacityGigabytes = diskCapacityBytes / 1e9
    source.put("disk_capacity_gigabytes", diskCapacityGigabytes)
    return source

Implementing an e-commerce search engine is a tough business. The part of the
iceberg under the water level – that is, the ETL pipeline – is not less than
that. In this post, I tried to share the lessons we piled up in the
implementation and maintenance of our decade-old ETL pipeline and how we
cultivated these to come up with something new. I attempted to explain how the
choice for the configuration DSL and the used primary storage engine has the
uttermost implication on the rest of the components of the architecture.
Elasticsearch has already been serving us pretty well in the search gateway.
Taking a step further and employing it in the ETL was a substantially
unconventional idea that gave the shivers to every engineer involved in the
decision. But the careful consideration and evaluation of potential candidates
paid off: It worked! So when you visit bol.com next time,
you will know that the Elasticsearch in the ETL pipeline – in addition to
many other Elasticsearch using services involved – cooked that warm page for
you seconds ago.

I would like thank to Berkay Buharalı,
Lourens Heijs, William Leese, Almer S.
Tigelaar
, Leon Widdershoven, and Maurice
Zeijen
for their valuable feedback in
bringing the post to its final form.

About

Leave a Reply

Your email address will not be published. Required fields are marked *

*

x

Check Also

How Capitalism Changed American Literature | Public Books

https://www.publicbooks.org/how-capitalism-changed-american-literature/

If this type of dark matter existed, people would be dying of unexplained ‘gunshot’ wounds

https://www.sciencemag.org/news/2019/07/if-type-dark-matter-existed-people-would-be-dying-unexplained-gunshot-wounds