ElasticStatement.java 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
  1. package org.elasticsearch.jdbc;
  2. import com.google.common.collect.Lists;
  3. import org.elasticsearch.action.search.SearchRequestBuilder;
  4. import org.elasticsearch.action.search.SearchResponse;
  5. import org.elasticsearch.dsl.bean.ElasticSqlParseResult;
  6. import org.elasticsearch.dsl.parser.ElasticSql2DslParser;
  7. import org.elasticsearch.jdbc.search.JdbcSearchActionExecutor;
  8. import org.elasticsearch.jdbc.search.JdbcSearchResponse;
  9. import org.elasticsearch.search.SearchHit;
  10. import java.sql.Connection;
  11. import java.sql.ResultSet;
  12. import java.sql.SQLException;
  13. import java.util.List;
  14. public class ElasticStatement extends AbstractStatement {
  15. protected ElasticConnection connection;
  16. public ElasticStatement(ElasticConnection connection) {
  17. this.connection = connection;
  18. }
  19. protected ResultSet executeResult;
  20. @Override
  21. public ResultSet executeQuery(String sql) throws SQLException {
  22. return executeQuery(sql, null);
  23. }
  24. @Override
  25. public ResultSet executeQuery(String sql, Object[] args) throws SQLException {
  26. ElasticSqlParseResult parseResult;
  27. try {
  28. ElasticSql2DslParser sql2DslParser = new ElasticSql2DslParser();
  29. parseResult = sql2DslParser.parse(sql, args);
  30. }
  31. catch (Exception exp) {
  32. throw new SQLException(String.format("[ElasticStatement] Failed to parse sql[%s]", sql), exp);
  33. }
  34. SearchResponse searchResponse;
  35. try {
  36. SearchRequestBuilder searchRequest = parseResult.toRequest(connection.getClient());
  37. searchResponse = JdbcSearchActionExecutor.get().syncExecuteWithException(searchRequest);
  38. }
  39. catch (Exception exp) {
  40. throw new SQLException(String.format("[ElasticStatement] Failed to execute search request sql[%s]", sql), exp);
  41. }
  42. JdbcSearchResponse<String> jdbcSearchResponse = new JdbcSearchResponse<String>();
  43. jdbcSearchResponse.setFailedShards(searchResponse.getFailedShards());
  44. jdbcSearchResponse.setSuccessfulShards(searchResponse.getSuccessfulShards());
  45. jdbcSearchResponse.setTookInMillis(searchResponse.getTookInMillis());
  46. jdbcSearchResponse.setTotalShards(searchResponse.getTotalShards());
  47. jdbcSearchResponse.setTotalHits(searchResponse.getHits().getTotalHits());
  48. List<String> hits = Lists.newLinkedList();
  49. for (SearchHit searchHit : searchResponse.getHits().getHits()) {
  50. hits.add(searchHit.getSourceAsString());
  51. }
  52. jdbcSearchResponse.setDocList(hits);
  53. return executeResult = new ElasticResultSet(this, jdbcSearchResponse.toJson());
  54. }
  55. @Override
  56. public boolean execute(String sql) throws SQLException {
  57. executeQuery(sql);
  58. return true;
  59. }
  60. @Override
  61. public ResultSet getResultSet() throws SQLException {
  62. return executeResult;
  63. }
  64. @Override
  65. public Connection getConnection() throws SQLException {
  66. return connection;
  67. }
  68. }