Elasticsearch and Symfony, export with the Scan/Scroll functions

Share Button

Lire la version française

Elasticsearch allows you to make advanced searches. Some users may want to extract their search results to Excel (or LibreOffice Calc…) to work on the data.

As we explained in our post Export data to a csv file with Symfony, the goal of a (successful) export is to limit the effect on the server.

With Elasticsearch, we can use the scan and scroll functions to iterate.

This function is designed to iterate on big set of data : see the ElasticSearch documentation. The main use is to reindex contents, for example when you change your mapping.

How scan/scroll work

1 – (Client) A query is sent to the Elasticsearch server, specifying a “scan” search type, the max delay between each iteration and the size of the iteration.
2 – (Server) The server response does not contain any result by a “scroll_id”. This is a unique reference to the first data iteration.
3 – (Client) For the first iteration, we use the scroll_id to send the request to the server.
4 – (Server) The server sends back the results that match the first iteration and a new scroll_id to the second set of data.

And again until all results have been retrieved.

The maximal duration specified to ElasticSearch allows it to know when it can clean its “photos”.
For example, for a parameter of 5s : if no query is made on a scroll_id during 5 secondes, the “photo” made by ElasticSearch is deleted.

The documentation describes the scroll function as “similar to opening a cursos against a database”.

Limitations

The main limitation of this function is that it’s impossible to use facets and sorting.
Which is not a problem to export raw data in csv.

Some queries examples

1 – Prepare the scroll with the scan search_type :
Client/server request(1)

curl -XGET 'http://10.0.2.15:9200/obtao/article/_search?search_type=scan&scroll=10s&size=300' -d '
{ 
  "query" : {
      "match_all" : {} 
  }
}'

  • Keep the data during 10s between 2 iterations
  • Iterate on 300 documents

2 – The server response will be, for example :
Server response (2)

{
 "_scroll_id" : "c2Nhbjs1OzUxMTk6cHFqYnowZzdSV09iN2tWS[...]zk7",
 "took":2,
 "timed_out":false,
 "_shards":{ 
    "total":5,
    "successful":5,
    "failed":0
  },
  "hits": {
    "total":3539,
    "max_score":0.0,
    "hits":[]
  }
}

You can see the scroll_id and the number of results.
The “photo” is now taken, we can iterate on our documents :

3 – First iteration
Client request n°2 (3)

curl -XGET 'http://10.0.2.15:9200/_search/scroll?scroll=10s&scroll_id=c2Nhbjs1OzUxMTk6cHFqYnowZzdSV09iN2tWS[...]zk7'

We send our scroll id and a new expiration duration. Notice that, during our iterations, we do not specify the index and the type in the URL (in our version of ElasticSearch, it crashes the request)

4 – Server response
Server response n°2 (4)

{
  "_scroll_id":   "c2Nhbjs1OzU0NDM6cHFqYno[...]",
  "took": 2,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "failed": 0
  }
  "hits": {
    "total": 3539,
    "max_score": 0,
    "hits": [
      {
        "_index": "obtao",
        "_type": "article",
        "_id" : 10,
        "_score" : 0 
        "_source" :
          {
            "id" : 10,
            "title" : "Test"
          } 
      },
      [...]
    ]
  }
}

We get a new scroll_id we’ll use in the next iteration.

The PHP code

Here are the several PHP lines (via Elastica) that match these queries.

In the controller, we are simply going to specify a route with a _format parameter to “csv”.
Our case being a bit more complexe, we have simplified it. Don’t hesitate to ask questions if you encounter any problem.

<?php

// in a Controller class
public function exportAction(Request $request)
{
    // get the Article index to send the query to the Article type
    $articleIndex = $this->get('fos_elastica.index.obtao_blog.article');

    // build the query in the SearchRepository. Here, we pass an empty ArticleSearch
    // object as we do not need particular filter
    $articleQuery = $this->getSearchRepository()->getQueryForSearch(new ArticleSearch());

    // init and configure the scan function
    $exportScan = $articleIndex->search($articleQuery, array(
        \Elastica\Search::OPTION_SEARCH_TYPE => \Elastica\Search::OPTION_SEARCH_TYPE_SCAN,
        \Elastica\Search::OPTION_SCROLL => '30s',
        \Elastica\Search::OPTION_SIZE => '50',
    ));

    $em = $this->get('doctrine.orm.entity_manager');
    $elasticaIndex = $this->get('fos_elastica.index');
    $articleTransformer = $this->get('obtao.transformers.elastica.article');

    $response = new StreamedResponse(function() use($exportScan,$em,$elasticaIndex, $articleTransformer) {
        $total = $exportScan->getTotalHits();
        $countArticles = 0;

        // get the first scroll id
        $scrollId = $exportScan->getResponse()->getScrollId();

        $handle = fopen('php://output', 'r+');

        while ($countArticles <= $total) {
            // get the data set for the given scroll id (the scroll id is valid 30 seconds)
            $response = $elasticaIndex->search(null ,array(
                \Elastica\Search::OPTION_SCROLL_ID => $scrollId,
                \Elastica\Search::OPTION_SCROLL => '30s'
            ));
            // and get the scroll id for the next data set
            $scrollId = $response->getResponse()->getScrollId();

            $articles = $response->getResults();
            // if there is no more article to fetch
            if (count($articles) == 0) {
                break;
            }

            // transforms each result into Doctrine object (optional)
            $articles = $articleTransformer->transform($articles);

            // insert the objects into the csv file
            foreach ($articles as $article) {
                $a = array(
                    'id' => $article->getId(),
                    'title' => $article->getTitle(),
                    'content' => substr($article->getContent(), 0, 50).'...',
                    'publishedAt' => $article->getPublishedAt()->format('Y-m-d'),
                );
                fputcsv($handle, $a);
                $countArticles++;
            }
            // clear the entity manager to keep the memory under control
            $em->clear();
        }

        fclose($handle);
    });

    $response->headers->set('Content-Type', 'application/force-download');
    $response->headers->set('Content-Disposition','attachment; filename="export.csv"');

    return $response;
}

Your export is now working ! A streamed csv file is directly sent to the client and the memory stay under control.

As usually, you can retrieve the code above in a sandbox project context on Github.

Share Button

Leave a Reply

Your email address will not be published. Required fields are marked *

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>

Protected by WP Anti Spam