Elasticsearch et Symfony, export avec la fonction Scan/Scroll

Share Button

Read the English version

Elasticsearch vous permet de réaliser des recherches avancées.
Certains utilisateurs souhaiteront extraire le résultat de leurs recherches sous excel (ou LibreOffice Calc…) pour retraiter les données.

Comme nous l’avons expliqué lors de notre article Exporter des données dans un fichier csv, le but d’un export (réussi) est de limiter l’impact sur le serveur.

Dans Elasticsearch, nous pouvons utiliser les fonctions scan et scroll pour réaliser nos itérations.

Cette fonction est prévue pour itérer sur de gros volumes de données : voir la documentation ElasticSearch. L’utilisation principale est la réindexation de contenus, lors d’un changement de mapping par exemple.

Le principe du scan/scroll

1 – (Client) Une requête est envoyée au serveur Elasticsearch, en précisant un type de recherche “scan”, la durée maximum entre chaque itération, et la taille d’une itération.
2 – (Serveur) La réponse du serveur ne contient aucun enregistrement, mais un “scroll_id”. Une référence unique vers la première itération de données.
3 – (Client) Pour cette première itération on utilise le scoll_id pour envoyer une requête au serveur
4 – (Serveur) Le serveur renvoit les enregistrements concernant la première itération ainsi qu’un nouveau scroll_id pointant vers le second groupe de données.

Et ainsi de suite, jusqu’à épuisement des enregistrements.

La durée maximale fournie à ElasticSearch lui permet de savoir à partir de quand nettoyer ses “photos”.
Par exemple, pour un paramètre de 5s : Si aucune requête n’est effectuée sur un scroll_id dans les 5 secondes, la “photo” réalisée par ElasticSearch est supprimée.

La documentation décrit la fonction scroll comme “similaire à l’ouverture d’un curseur sur une base de données”.

Les limitations

La principale limitation de cette fonction est qu’il est impossible d’utiliser les facettes et les tris.
Ce qui n’est pas un problème lors d’un export de données brutes en csv.

Quelques exemples de requêtes

1 – Préparer le scroll à l’aide du search_type scan :
Requête client/serveur (1)

curl -XGET 'http://10.0.2.15:9200/obtao/article/_search?search_type=scan&scroll=10s&size=300' -d '
{ 
  "query" : {
      "match_all" : {} 
  }
}'

  • 10s de conservation des données entre 2 itérations
  • Itérations de 300 documents

2 – La réponse du serveur sera par exemple :
Réponse serveur (2)

{
 "_scroll_id" : "c2Nhbjs1OzUxMTk6cHFqYnowZzdSV09iN2tWS[...]zk7",
 "took":2,
 "timed_out":false,
 "_shards":{ 
    "total":5,
    "successful":5,
    "failed":0
  },
  "hits": {
    "total":3539,
    "max_score":0.0,
    "hits":[]
  }
}

On peut y lire : le scroll_id et le nombre total de résultats.
La “photos” est prise, nous pouvons maintenant itérer sur nos documents :

3 – Première itération
Requête client n°2 (3)

curl -XGET 'http://10.0.2.15:9200/_search/scroll?scroll=10s&scroll_id=c2Nhbjs1OzUxMTk6cHFqYnowZzdSV09iN2tWS[...]zk7'

Nous envoyons l’id du scroll ainsi qu’une nouvelle durée de péremption.
Remarquez que, lors de ces itérations, nous ne précisons pas dans l’URL l’index et le type. (Dans notre version ElasticSearch, cela fait même planter la requête)

4 – Réponse du serveur
Reponse serveur n°2 (4)

{
  "_scroll_id":   "c2Nhbjs1OzU0NDM6cHFqYno[...]",
  "took": 2,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "failed": 0
  }
  "hits": {
    "total": 3539,
    "max_score": 0,
    "hits": [
      {
        "_index": "obtao",
        "_type": "article",
        "_id" : 10,
        "_score" : 0 
        "_source" :
          {
            "id" : 10,
            "title" : "Test"
          } 
      },
      [...]
    ]
  }
}

On reçoit donc un nouveau scroll_id, que nous utiliserons lors de notre prochaine itération.

Le code PHP

Voici les quelques lignes PHP (via Elastica) qui retranscrivent ces requêtes.

Côté contrôleur, nous allons simplement déclarer une route accessible avec le _format “csv”.
Notre cas étant plus complexe, nous l’avons simplifié. N’hésitez pas à nous poser des questions au moindre problème rencontré.

<?php

// dans une classe Controller
public function exportAction(Request $request)
{
    // recupère l'index Article pour envoyer la requête au type Article
    $articleIndex = $this->get('fos_elastica.index.obtao_blog.article');

    // construit la requête dans le searchRepository, ici nous passons un objet ArticleSearch
    // vide car nous n'avons pas besoin de filtre particulier
    $articleQuery = $this->getSearchRepository()->getQueryForSearch(new ArticleSearch());

    // initialise et configure la fonction scan
    $exportScan = $articleIndex->search($articleQuery, array(
        \Elastica\Search::OPTION_SEARCH_TYPE => \Elastica\Search::OPTION_SEARCH_TYPE_SCAN,
        \Elastica\Search::OPTION_SCROLL => '30s',
        \Elastica\Search::OPTION_SIZE => '50',
    ));

    $em = $this->get('doctrine.orm.entity_manager');
    $elasticaIndex = $this->get('fos_elastica.index');
    $articleTransformer = $this->get('obtao.transformers.elastica.article');

    $response = new StreamedResponse(function() use($exportScan,$em,$elasticaIndex, $articleTransformer) {
        $total = $exportScan->getTotalHits();
        $countArticles = 0;

        // récupère le premier id de scroll
        $scrollId = $exportScan->getResponse()->getScrollId();

        $handle = fopen('php://output', 'r+');

        while ($countArticles <= $total) {
            // récupère le jeu de données pour l'id de scroll donné (l'id est valide 30secondes)
            $response = $elasticaIndex->search(null ,array(
                \Elastica\Search::OPTION_SCROLL_ID => $scrollId,
                \Elastica\Search::OPTION_SCROLL => '30s'
            ));
            // et récupère l'id de scroll pour le prochain jeu de données
            $scrollId = $response->getResponse()->getScrollId();

            $articles = $response->getResults();
            // s'il n'y a plus aucun article à récupérer
            if (count($articles) == 0) {
                break;
            }

            // transforme chaque résultat en objet Doctrine (optionnel)
            $articles = $articleTransformer->transform($articles);

            // insert les objets dans le fichier csv
            foreach ($articles as $article) {
                $a = array(
                    'id' => $article->getId(),
                    'title' => $article->getTitle(),
                    'content' => substr($article->getContent(), 0, 50).'...',
                    'publishedAt' => $article->getPublishedAt()->format('Y-m-d'),
                );
                fputcsv($handle, $a);
                $countArticles++;
            }
            // vide le gestionnaire d'entité pour garder la mémoire sous contrôle
            $em->clear();
        }

        fclose($handle);
    });

    $response->headers->set('Content-Type', 'application/force-download');
    $response->headers->set('Content-Disposition','attachment; filename="export.csv"');

    return $response;
}

Votre export est fonctionnel ! Un fichier csv est envoyé directement en stream au client et la mémoire reste sous contrôle.

Comme d’habitude, vous pouvez retrouver le code ci-dessus dans un contexte de projet bac à sable sur Github.

Share Button

2 thoughts on “Elasticsearch et Symfony, export avec la fonction Scan/Scroll

Leave a Reply

Your email address will not be published. Required fields are marked *

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>

Protected by WP Anti Spam