spuerx преди 9 години
родител
ревизия
6d81f188bd
променени са 26 файла, в които са добавени 1013 реда и са изтрити 337 реда
  1. 55 237
      create_index.sh
  2. 4 22
      pom.xml
  3. 1 1
      src/main/java/org/elasticsearch/dsl/bean/ElasticSqlParseResult.java
  4. 3 3
      src/main/java/org/elasticsearch/dsl/helper/ElasticSqlDateParseHelper.java
  5. 202 0
      src/main/java/org/elasticsearch/jdbc/AbstractFeatureNotSupportedPreparedStatement.java
  6. 6 1
      src/main/java/org/elasticsearch/jdbc/AbstractStatement.java
  7. 1 1
      src/main/java/org/elasticsearch/jdbc/ElasticConnection.java
  8. 19 5
      src/main/java/org/elasticsearch/jdbc/ElasticDriver.java
  9. 180 0
      src/main/java/org/elasticsearch/jdbc/ElasticPreparedStatement.java
  10. 1 1
      src/main/java/org/elasticsearch/jdbc/ElasticResultSetMetaData.java
  11. 51 26
      src/main/java/org/elasticsearch/jdbc/ElasticSingleConnectionDataSource.java
  12. 40 7
      src/main/java/org/elasticsearch/jdbc/ElasticStatement.java
  13. 0 4
      src/main/java/org/elasticsearch/jdbc/SearchActionExecutor.java
  14. 23 0
      src/main/java/org/elasticsearch/jdbc/exception/ResolveSearchResponseException.java
  15. 58 0
      src/main/java/org/elasticsearch/jdbc/search/JdbcSearchActionExecutor.java
  16. 73 0
      src/main/java/org/elasticsearch/jdbc/search/JdbcSearchResponse.java
  17. 80 0
      src/main/java/org/elasticsearch/jdbc/search/JdbcSearchResponseResolver.java
  18. 7 0
      src/main/java/org/elasticsearch/jdbc/search/TransportClientProvider.java
  19. 32 17
      src/main/java/org/elasticsearch/jdbc/TransportClientFactory.java
  20. 7 0
      src/main/java/org/elasticsearch/utils/Constants.java
  21. 3 1
      src/main/java/org/elasticsearch/client/ElasticMockClient.java
  22. 54 10
      src/test/java/org/elasticsearch/jdbc/ElasticDriverTest.java
  23. 23 0
      src/test/java/org/elasticsearch/jdbc/bean/Buyer.java
  24. 66 0
      src/test/java/org/elasticsearch/jdbc/bean/Product.java
  25. 23 0
      src/test/java/org/elasticsearch/jdbc/bean/Provider.java
  26. 1 1
      src/test/java/org/elasticsearch/query/SqlParserWhereConditionTest.java

+ 55 - 237
create_index.sh

@@ -9,81 +9,43 @@ curl -XPUT 'http://192.168.0.109:9200/index/' -d '{
 		}
 	},
 	"mappings": {
-		"library": {
+		"product": {
 			"properties": {
-				"name": {
+				"productName": {
 					"type": "string",
 					"index": "not_analyzed"
 				},
-				"manager": {
+				"productCode": {
+					"type": "string",
+					"index": "not_analyzed"
+				},
+				"minPrice": {
+					"type": "double"
+				},
+				"advicePrice": {
+					"type": "double"
+				},
+				"provider": {
 					"type": "object",
 					"properties": {
-						"managerName": {
+						"providerName": {
 							"type": "string",
 							"index": "not_analyzed"
 						},
-						"floors": {
-							"type": "nested",
-							"properties": {
-								"floorNum": {
-									"type": "integer"
-								},
-								"area": {
-									"type": "string",
-									"index": "not_analyzed"
-								}
-							}
+						"providerLevel" : {
+							"type": "integer"
 						}
 					}
 				},
-				"bookCategories": {
+				"buyers": {
 					"type": "nested",
 					"properties": {
-						"categoryName": {
-							"type": "string",
-							"index": "not_analyzed"
-						},
-						"categoryCode": {
+						"buyerName": {
 							"type": "string",
 							"index": "not_analyzed"
 						},
-						"books": {
-							"type": "nested",
-							"properties": {
-								"bookStock": {
-									"type": "integer"
-								},
-								"bookAuthor": {
-									"type": "string",
-									"index": "not_analyzed"
-								},
-								"bookName": {
-									"type": "string",
-									"index": "not_analyzed"
-								},
-								"bookPublisher" : {
-								    "type" : "object",
-								    "properties" : {
-								        "publisherName" : {
-								            "type" : "string",
-								            "index": "not_analyzed"
-								        },
-								        "publisherCode" : {
-								            "type" : "string",
-								            "index": "not_analyzed"
-								        },
-								        "bookProvider" : {
-                                            "type" : "nested",
-                                            "properties" : {
-                                                "providerName" : {
-                                                    "type" : "string",
-								                    "index": "not_analyzed"
-                                                }
-                                            }
-                                        }
-                                    }
-								}
-							}
+						"productPrice": {
+							"type": "double"
 						}
 					}
 				}
@@ -93,194 +55,50 @@ curl -XPUT 'http://192.168.0.109:9200/index/' -d '{
 }'
 
 
-curl -XPUT 'http://192.168.0.109:9200/index/library/1' -d '{
-	"name": "HBUT",
-	"manager": {
-		"managerName": "CN",
-		"floors": [{
-			"floorNum": 1,
-			"area": ["A","B"]
-		},
-		{
-			"floorNum": 2,
-			"area": ["A","B","C"]
-		}]
+curl -XPUT 'http://192.168.0.109:9200/index/product/1' -d '{
+	"productName" : "iphone 6s",
+	"productCode" : "IP_6S",
+	"minPrice" : 2288.00,
+	"advicePrice" : "6288.00",
+	"provider" : {
+		"providerName" : "foxconn",
+		"providerLevel" : 1
+	},
+	"buyers" : [{
+		"buyerName" : "china",
+		"productPrice" : 9288.00
 	},
-	"bookCategories": [{
-		"categoryName": "IT",
-		"categoryCode": "C001",
-		"books": [{
-			"bookName": "Java Core",
-			"bookStock": 22,
-			"bookAuthor": "jason"
-		},
-		{
-			"bookName": "Multi Thread",
-			"bookStock": 12,
-			"bookAuthor": "jason"
-		}]
+	{
+		"buyerName" : "usa",
+		"productPrice" : 3288.00
 	},
 	{
-		"categoryName": "ART",
-		"categoryCode": "C002",
-		"books": [{
-			"bookName": "Chinese 5000",
-			"bookStock": 13,
-			"bookAuthor": "bibic"
-		},
-		{
-			"bookName": "qgjq",
-			"bookStock": 18,
-			"bookAuthor": "lcy"
-		}]
+		"buyerName" : "japan",
+		"productPrice" : 4288.00
 	}]
 }'
 
-
-
-curl -XPUT 'http://192.168.0.109:9200/index/library/2' -d '{
-	"name" : "HZKJDX",
-	"manager" : {
-		"managerName": "lcy",
-		"floors": [{
-			"floorNum" : 1,
-			"area" : ["M", "X"]
-		},
-		{
-			"floorNum" : 2,
-			"area" : ["A"]
-		},
-		{
-			"floorNum" : 4,
-			"area" : ["N"]
-		}]
+curl -XPUT 'http://192.168.0.109:9200/index/product/2' -d '{
+	"productName" : "apple watch os2",
+	"productCode" : "AW_OS2",
+	"minPrice" : 1000.00,
+	"advicePrice" : "5000.00",
+	"provider" : {
+		"providerName" : "foxconn",
+		"providerLevel" : 1
 	},
-	"bookCategories" : [{
-		"categoryName" : "NEWS",
-		"categoryCode" : "C001",
-		"books" : [{
-			"bookName" : "cqcb",
-			"bookStock" : 22,
-			"bookAuthor" : "cq"
-		},
-		{
-			"bookName" : "yyxw",
-			"bookStock" : 12,
-			"bookAuthor" : "cq"
-		}]
+	"buyers" : [{
+		"buyerName" : "china",
+		"productPrice" : 9999.00
 	},
 	{
-		"categoryName" : "ART",
-		"categoryCode" : "C002",
-		"books" : [{
-			"bookName" : "Cinese 5000",
-			"bookStock" : 13,
-			"bookAuthor" : "bibicx",
-			"bookPublisher" : {
-			    "publisherName" : "CQ_PUB",
-			    "publisherCode" : "PUB_03",
-			    "bookProvider" : [{
-                    "providerName" : "PVD_01"
-                }]
-			}
-		},
-		{
-			"bookName" : "qgjq",
-			"bookStock" : 18,
-			"bookAuthor" : "lcy"
-		}]
+		"buyerName" : "usa",
+		"productPrice" : 4500.00
+	},
+	{
+		"buyerName" : "japan",
+		"productPrice" : 6000.00
 	}]
 }'
 
-
-curl -XPOST 'http://192.168.0.109:9200/index/_refresh'
-
-
-
-curl -XPOST 'http://192.168.0.109:9200/index/_search' -d '{
-  "query": {
-    "filtered": {
-      "filter": {
-        "nested": {
-          "path": "bookCategories",
-          "filter": {
-            "bool": {
-              "must": [
-                {
-                  "term": {
-                    "bookCategories.categoryName": "ART"
-                  }
-                }
-              ]
-            }
-          }
-        }
-      }
-    }
-  }
-}'
-
-
-curl -XPOST 'http://192.168.0.109:9200/index/_search' -d '{
-  "query": {
-    "filtered": {
-      "filter": {
-        "nested": {
-          "path": "manager.floors",
-          "filter": {
-            "bool": {
-              "must": [
-                {
-                  "term": {
-                    "manager.floors.floorNum": 1
-                  }
-                }
-              ]
-            }
-          }
-        }
-      }
-    }
-  }
-}'
-
-
-curl -XPOST 'http://192.168.0.109:9200/index/_search' -d '{
-  "query": {
-    "filtered": {
-      "filter": {
-        "nested": {
-          "filter": {
-            "bool": {
-              "must": [
-                {
-                  "term": {
-                    "bookCategories.categoryName": "ART"
-                  }
-                },
-                {
-                  "nested": {
-                    "filter": {
-                      "bool": {
-                        "must": {
-                          "terms": {
-                            "bookCategories.books.bookAuthor": [
-                              "bibicx"
-                            ]
-                          }
-                        }
-                      }
-                    },
-                    "path": "bookCategories.books"
-                  }
-                }
-              ]
-            }
-          },
-          "path": "bookCategories"
-        }
-      }
-    }
-  }
-}'
-
+curl -XPOST 'http://192.168.0.109:9200/index/_refresh'

+ 4 - 22
pom.xml

@@ -75,8 +75,6 @@
             <version>2.6</version>
         </dependency>
 
-
-
         <dependency>
             <groupId>org.elasticsearch</groupId>
             <artifactId>elasticsearch</artifactId>
@@ -85,12 +83,12 @@
         </dependency>
 
         <dependency>
-            <groupId>org.apache.logging.log4j</groupId>
-            <artifactId>log4j-api</artifactId>
-            <version>2.3</version>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+            <version>1.7.7</version>
         </dependency>
 
-		<dependency>
+        <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
             <version>4.12</version>
@@ -134,22 +132,6 @@
                     </execution>
                 </executions>
             </plugin>
-            <plugin>
-                <artifactId>maven-assembly-plugin</artifactId>
-                <version>2.3</version>
-                <configuration>
-                    <appendAssemblyId>false</appendAssemblyId>
-                    <outputDirectory>${project.build.directory}/releases/</outputDirectory>
-                </configuration>
-                <executions>
-                    <execution>
-                        <phase>package</phase>
-                        <goals>
-                            <goal>single</goal>
-                        </goals>
-                    </execution>
-                </executions>
-            </plugin>
         </plugins>
     </build>
 </project>

+ 1 - 1
src/main/java/org/elasticsearch/dsl/bean/ElasticSqlParseResult.java

@@ -10,7 +10,7 @@ import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.search.aggregations.AbstractAggregationBuilder;
 import org.elasticsearch.search.aggregations.AggregationBuilder;
 import org.elasticsearch.search.sort.SortBuilder;
-import org.elasticsearch.client.ElasticMockClient;
+import org.elasticsearch.utils.ElasticMockClient;
 
 import java.util.List;
 

+ 3 - 3
src/main/java/org/elasticsearch/dsl/helper/ElasticSqlDateParseHelper.java

@@ -2,6 +2,7 @@ package org.elasticsearch.dsl.helper;
 
 import com.alibaba.druid.sql.ast.expr.SQLMethodInvokeExpr;
 import org.elasticsearch.dsl.exception.ElasticSql2DslException;
+import org.elasticsearch.utils.Constants;
 
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
@@ -9,7 +10,6 @@ import java.util.Date;
 import java.util.regex.Pattern;
 
 public class ElasticSqlDateParseHelper {
-    public static final String DEFAULT_ES_DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSZ";
 
     public static final Pattern SQL_DATE_REGEX_PATTERN_01 = Pattern.compile("\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}");
     public static final Pattern SQL_DATE_REGEX_PATTERN_02 = Pattern.compile("\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}");
@@ -44,7 +44,7 @@ public class ElasticSqlDateParseHelper {
 
     public static String formatDefaultEsDateObjectValue(Object date) {
         if (date instanceof Date) {
-            SimpleDateFormat dateFormat = new SimpleDateFormat(DEFAULT_ES_DATE_FORMAT);
+            SimpleDateFormat dateFormat = new SimpleDateFormat(Constants.DEFAULT_ES_DATE_FORMAT);
             return dateFormat.format(date);
         }
         throw new ElasticSql2DslException("[syntax error] Sql cannot support such date type: " + date.getClass());
@@ -55,7 +55,7 @@ public class ElasticSqlDateParseHelper {
             SimpleDateFormat dateFormat = new SimpleDateFormat(patternArg);
             Date date = dateFormat.parse(timeValArg);
 
-            dateFormat = new SimpleDateFormat(DEFAULT_ES_DATE_FORMAT);
+            dateFormat = new SimpleDateFormat(Constants.DEFAULT_ES_DATE_FORMAT);
             return dateFormat.format(date);
         }
         catch (ParseException pex) {

+ 202 - 0
src/main/java/org/elasticsearch/jdbc/AbstractFeatureNotSupportedPreparedStatement.java

@@ -0,0 +1,202 @@
+package org.elasticsearch.jdbc;
+
+
+import java.io.InputStream;
+import java.io.Reader;
+import java.net.URL;
+import java.sql.*;
+import java.util.Calendar;
+
+public abstract class AbstractFeatureNotSupportedPreparedStatement extends ElasticStatement implements PreparedStatement {
+
+    public AbstractFeatureNotSupportedPreparedStatement(ElasticConnection connection) {
+        super(connection);
+    }
+
+    @Override
+    public int executeUpdate() throws SQLException {
+        throw new SQLFeatureNotSupportedException("executeUpdate");
+    }
+
+    @Override
+    public void setNull(int parameterIndex, int sqlType) throws SQLException {
+        throw new SQLFeatureNotSupportedException("setNull");
+    }
+
+
+    @Override
+    public void setBytes(int parameterIndex, byte[] x) throws SQLException {
+        throw new SQLFeatureNotSupportedException("setBytes");
+    }
+
+    @Override
+    public void setAsciiStream(int parameterIndex, InputStream x, int length) throws SQLException {
+        throw new SQLFeatureNotSupportedException("setAsciiStream");
+    }
+
+    @Override
+    public void setUnicodeStream(int parameterIndex, InputStream x, int length) throws SQLException {
+        throw new SQLFeatureNotSupportedException("setUnicodeStream");
+    }
+
+    @Override
+    public void setBinaryStream(int parameterIndex, InputStream x, int length) throws SQLException {
+        throw new SQLFeatureNotSupportedException("setBinaryStream");
+    }
+
+
+    @Override
+    public void setObject(int parameterIndex, Object x, int targetSqlType) throws SQLException {
+        throw new SQLFeatureNotSupportedException("setObject");
+    }
+
+    @Override
+    public void addBatch() throws SQLException {
+        throw new SQLFeatureNotSupportedException("addBatch");
+    }
+
+    @Override
+    public void setCharacterStream(int parameterIndex, Reader reader, int length) throws SQLException {
+        throw new SQLFeatureNotSupportedException("setCharacterStream");
+    }
+
+    @Override
+    public void setRef(int parameterIndex, Ref x) throws SQLException {
+        throw new SQLFeatureNotSupportedException("setRef");
+    }
+
+    @Override
+    public void setBlob(int parameterIndex, Blob x) throws SQLException {
+        throw new SQLFeatureNotSupportedException("setRef");
+    }
+
+    @Override
+    public void setClob(int parameterIndex, Clob x) throws SQLException {
+        throw new SQLFeatureNotSupportedException("setRef");
+    }
+
+    @Override
+    public void setArray(int parameterIndex, Array x) throws SQLException {
+        throw new SQLFeatureNotSupportedException("setArray");
+    }
+
+    @Override
+    public void setURL(int parameterIndex, URL x) throws SQLException {
+        throw new SQLFeatureNotSupportedException("setURL");
+    }
+
+    @Override
+    public void setRowId(int parameterIndex, RowId x) throws SQLException {
+        throw new SQLFeatureNotSupportedException("setRowId");
+    }
+
+    @Override
+    public void setNString(int parameterIndex, String value) throws SQLException {
+        throw new SQLFeatureNotSupportedException("setNString");
+    }
+
+    @Override
+    public void setNCharacterStream(int parameterIndex, Reader value, long length) throws SQLException {
+        throw new SQLFeatureNotSupportedException("setNCharacterStream");
+    }
+
+    @Override
+    public void setNClob(int parameterIndex, NClob value) throws SQLException {
+        throw new SQLFeatureNotSupportedException("setNClob");
+    }
+
+    @Override
+    public void setClob(int parameterIndex, Reader reader, long length) throws SQLException {
+        throw new SQLFeatureNotSupportedException("setClob");
+    }
+
+    @Override
+    public void setBlob(int parameterIndex, InputStream inputStream, long length) throws SQLException {
+        throw new SQLFeatureNotSupportedException("setBlob");
+    }
+
+    @Override
+    public void setNClob(int parameterIndex, Reader reader, long length) throws SQLException {
+        throw new SQLFeatureNotSupportedException("setNClob");
+    }
+
+    @Override
+    public void setSQLXML(int parameterIndex, SQLXML xmlObject) throws SQLException {
+        throw new SQLFeatureNotSupportedException("setSQLXML");
+    }
+
+    @Override
+    public void setObject(int parameterIndex, Object x, int targetSqlType, int scaleOrLength) throws SQLException {
+        throw new SQLFeatureNotSupportedException("setObject");
+    }
+
+    @Override
+    public void setAsciiStream(int parameterIndex, InputStream x, long length) throws SQLException {
+        throw new SQLFeatureNotSupportedException("setAsciiStream");
+    }
+
+    @Override
+    public void setBinaryStream(int parameterIndex, InputStream x, long length) throws SQLException {
+        throw new SQLFeatureNotSupportedException("setBinaryStream");
+    }
+
+    @Override
+    public void setCharacterStream(int parameterIndex, Reader reader, long length) throws SQLException {
+        throw new SQLFeatureNotSupportedException("setCharacterStream");
+    }
+
+    @Override
+    public void setAsciiStream(int parameterIndex, InputStream x) throws SQLException {
+        throw new SQLFeatureNotSupportedException("setAsciiStream");
+    }
+
+    @Override
+    public void setBinaryStream(int parameterIndex, InputStream x) throws SQLException {
+        throw new SQLFeatureNotSupportedException("setBinaryStream");
+    }
+
+    @Override
+    public void setCharacterStream(int parameterIndex, Reader reader) throws SQLException {
+        throw new SQLFeatureNotSupportedException("setCharacterStream");
+    }
+
+    @Override
+    public void setNCharacterStream(int parameterIndex, Reader value) throws SQLException {
+        throw new SQLFeatureNotSupportedException("setNCharacterStream");
+    }
+
+    @Override
+    public void setClob(int parameterIndex, Reader reader) throws SQLException {
+        throw new SQLFeatureNotSupportedException("setClob");
+    }
+
+    @Override
+    public void setBlob(int parameterIndex, InputStream inputStream) throws SQLException {
+        throw new SQLFeatureNotSupportedException("setBlob");
+    }
+
+    @Override
+    public void setNClob(int parameterIndex, Reader reader) throws SQLException {
+        throw new SQLFeatureNotSupportedException("setNClob");
+    }
+
+    @Override
+    public void setNull(int parameterIndex, int sqlType, String typeName) throws SQLException {
+        throw new SQLFeatureNotSupportedException("setNull");
+    }
+
+    @Override
+    public void setDate(int parameterIndex, Date x, Calendar cal) throws SQLException {
+        throw new SQLFeatureNotSupportedException("setDate");
+    }
+
+    @Override
+    public void setTime(int parameterIndex, Time x, Calendar cal) throws SQLException {
+        throw new SQLFeatureNotSupportedException("setTime");
+    }
+
+    @Override
+    public void setTimestamp(int parameterIndex, Timestamp x, Calendar cal) throws SQLException {
+        throw new SQLFeatureNotSupportedException("setTimestamp");
+    }
+}

+ 6 - 1
src/main/java/org/elasticsearch/jdbc/AbstractStatement.java

@@ -1,6 +1,8 @@
 package org.elasticsearch.jdbc;
 
-import java.sql.*;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.SQLWarning;
 
 public abstract class AbstractStatement extends AbstractFeatureNotSupportedStatement {
 
@@ -121,4 +123,7 @@ public abstract class AbstractStatement extends AbstractFeatureNotSupportedState
     public final boolean isWrapperFor(final Class<?> iface) throws SQLException {
         return iface.isInstance(this);
     }
+
+
+    protected abstract ResultSet executeQuery(String sql, Object[] args) throws SQLException;
 }

+ 1 - 1
src/main/java/org/elasticsearch/jdbc/ElasticConnection.java

@@ -28,7 +28,7 @@ public class ElasticConnection extends AbstractConnection {
 
     @Override
     public PreparedStatement prepareStatement(String sql) throws SQLException {
-        return null;
+        return new ElasticPreparedStatement(this, sql);
     }
 
     @Override

+ 19 - 5
src/main/java/org/elasticsearch/jdbc/ElasticDriver.java

@@ -1,6 +1,9 @@
 package org.elasticsearch.jdbc;
 
-import org.elasticsearch.client.Client;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.jdbc.search.TransportClientProvider;
+import org.elasticsearch.jdbc.search.TransportClientProviderImpl;
+import org.slf4j.LoggerFactory;
 
 import java.sql.*;
 import java.util.Properties;
@@ -8,9 +11,13 @@ import java.util.logging.Logger;
 
 public class ElasticDriver implements Driver {
 
+    private static final org.slf4j.Logger logger = LoggerFactory.getLogger(ElasticDriver.class);
+
     private static final String ELASTIC_SEARCH_DRIVER_PREFIX = "jdbc:elastic:";
 
-    private Client client;
+    private TransportClient transportClient = null;
+
+    private TransportClientProvider transportClientProvider;
 
     static {
         try {
@@ -27,9 +34,16 @@ public class ElasticDriver implements Driver {
 
     @Override
     public Connection connect(String url, Properties info) throws SQLException {
-        String ipUrl = url.substring(ELASTIC_SEARCH_DRIVER_PREFIX.length());
-        Client client = TransportClientFactory.createTransportClientFromUrl(ipUrl);
-        return new ElasticConnection(url, info, client);
+        synchronized (ElasticDriver.class) {
+            if (transportClientProvider == null) {
+                transportClientProvider = new TransportClientProviderImpl();
+            }
+            transportClient = transportClientProvider.createTransportClientFromUrl(url);
+            if (transportClient == null) {
+                throw new SQLException(String.format("ElasticDriver.connect] Failed to build transport client for url[%s]", url));
+            }
+        }
+        return new ElasticConnection(url, info, transportClient);
     }
 
     @Override

+ 180 - 0
src/main/java/org/elasticsearch/jdbc/ElasticPreparedStatement.java

@@ -0,0 +1,180 @@
+package org.elasticsearch.jdbc;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.elasticsearch.utils.Constants;
+
+import java.math.BigDecimal;
+import java.sql.*;
+import java.text.SimpleDateFormat;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+
+public class ElasticPreparedStatement extends AbstractFeatureNotSupportedPreparedStatement {
+    private Map<Integer, SQLParam> paramMap = Maps.newHashMap();
+
+    private String sql;
+
+    public ElasticPreparedStatement(ElasticConnection connection, String sql) {
+        super(connection);
+        this.sql = sql;
+    }
+
+    @Override
+    public boolean execute() throws SQLException {
+        executeQuery();
+        return true;
+    }
+
+    @Override
+    public ResultSet executeQuery() throws SQLException {
+        if (paramMap.size() > 0) {
+            List<SQLParam> paramList = Lists.newArrayList(paramMap.values());
+            Collections.sort(paramList, new Comparator<SQLParam>() {
+                @Override
+                public int compare(SQLParam o1, SQLParam o2) {
+                    if (o1.getParamIndex() < o2.getParamIndex()) {
+                        return -1;
+                    }
+                    if (o1.getParamIndex() > o2.getParamIndex()) {
+                        return 1;
+                    }
+                    return 0;
+                }
+            });
+
+            List<Object> argList = Lists.transform(paramList, new Function<SQLParam, Object>() {
+                @Override
+                public Object apply(SQLParam sqlParam) {
+                    return sqlParam.getParamVal();
+                }
+            });
+
+            return executeQuery(sql, argList.toArray(new Object[argList.size()]));
+        }
+        return executeQuery(sql);
+    }
+
+    @Override
+    public void setBoolean(int parameterIndex, boolean x) throws SQLException {
+        paramMap.put(parameterIndex, new SQLParam(parameterIndex, x));
+    }
+
+    @Override
+    public void setByte(int parameterIndex, byte x) throws SQLException {
+        paramMap.put(parameterIndex, new SQLParam(parameterIndex, x));
+    }
+
+    @Override
+    public void setShort(int parameterIndex, short x) throws SQLException {
+        paramMap.put(parameterIndex, new SQLParam(parameterIndex, x));
+    }
+
+    @Override
+    public void setInt(int parameterIndex, int x) throws SQLException {
+        paramMap.put(parameterIndex, new SQLParam(parameterIndex, x));
+    }
+
+    @Override
+    public void setLong(int parameterIndex, long x) throws SQLException {
+        paramMap.put(parameterIndex, new SQLParam(parameterIndex, x));
+    }
+
+    @Override
+    public void setFloat(int parameterIndex, float x) throws SQLException {
+        paramMap.put(parameterIndex, new SQLParam(parameterIndex, x));
+    }
+
+    @Override
+    public void setDouble(int parameterIndex, double x) throws SQLException {
+        paramMap.put(parameterIndex, new SQLParam(parameterIndex, x));
+    }
+
+    @Override
+    public void setBigDecimal(int parameterIndex, BigDecimal x) throws SQLException {
+        paramMap.put(parameterIndex, new SQLParam(parameterIndex, x));
+    }
+
+    @Override
+    public void setString(int parameterIndex, String x) throws SQLException {
+        paramMap.put(parameterIndex, new SQLParam(parameterIndex, x));
+    }
+
+    @Override
+    public void setDate(int parameterIndex, Date x) throws SQLException {
+        SimpleDateFormat dateFormat = new SimpleDateFormat(Constants.DEFAULT_ES_DATE_FORMAT);
+        String dateStr = dateFormat.format(x);
+        paramMap.put(parameterIndex, new SQLParam(parameterIndex, dateStr));
+    }
+
+    @Override
+    public void setTime(int parameterIndex, Time x) throws SQLException {
+        SimpleDateFormat dateFormat = new SimpleDateFormat(Constants.DEFAULT_ES_DATE_FORMAT);
+        String dateStr = dateFormat.format(x);
+        paramMap.put(parameterIndex, new SQLParam(parameterIndex, dateStr));
+    }
+
+    @Override
+    public void setTimestamp(int parameterIndex, Timestamp x) throws SQLException {
+        SimpleDateFormat dateFormat = new SimpleDateFormat(Constants.DEFAULT_ES_DATE_FORMAT);
+        String dateStr = dateFormat.format(x);
+        paramMap.put(parameterIndex, new SQLParam(parameterIndex, dateStr));
+    }
+
+    @Override
+    public void clearParameters() throws SQLException {
+        paramMap.clear();
+    }
+
+    @Override
+    public void setObject(int parameterIndex, Object x) throws SQLException {
+        if (x instanceof Date) {
+            setDate(parameterIndex, (Date) x);
+        }
+        else if (x instanceof Time) {
+            setTime(parameterIndex, (Time) x);
+        }
+        else if (x instanceof java.util.Date) {
+            SimpleDateFormat dateFormat = new SimpleDateFormat(Constants.DEFAULT_ES_DATE_FORMAT);
+            String dateStr = dateFormat.format(x);
+            paramMap.put(parameterIndex, new SQLParam(parameterIndex, dateStr));
+        }
+        else {
+            paramMap.put(parameterIndex, new SQLParam(parameterIndex, x));
+        }
+    }
+
+    @Override
+    public ResultSetMetaData getMetaData() throws SQLException {
+        return new ElasticResultSetMetaData();
+    }
+
+    @Override
+    public ParameterMetaData getParameterMetaData() throws SQLException {
+        return null;
+    }
+
+    private class SQLParam {
+
+        private int paramIndex;
+
+        private Object paramVal;
+
+        public SQLParam(int paramIndex, Object paramVal) {
+            this.paramIndex = paramIndex;
+            this.paramVal = paramVal;
+        }
+
+        public Object getParamVal() {
+            return paramVal;
+        }
+
+        public int getParamIndex() {
+            return paramIndex;
+        }
+
+    }
+}

+ 1 - 1
src/main/java/org/elasticsearch/jdbc/ElasticResultSetMetaData.java

@@ -6,7 +6,7 @@ import java.sql.Types;
 
 public class ElasticResultSetMetaData implements ResultSetMetaData {
 
-    protected static final String JSON_DATA_COL_NAME = "json_result";
+    protected static final String JSON_DATA_COL_NAME = "__INDEX_DOCS__";
 
     @Override
     public int getColumnCount() throws SQLException {

+ 51 - 26
src/main/java/org/elasticsearch/jdbc/ElasticSingleConnectionDataSource.java

@@ -1,6 +1,11 @@
 package org.elasticsearch.jdbc;
 
 
+import org.elasticsearch.client.Client;
+import org.elasticsearch.jdbc.search.TransportClientProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
@@ -10,12 +15,18 @@ import java.sql.SQLException;
 
 public class ElasticSingleConnectionDataSource extends DriverManagerDataSource implements SmartDataSource {
 
+    private static final Logger logger = LoggerFactory.getLogger(ElasticSingleConnectionDataSource.class);
+
     private boolean suppressClose;
 
     private Connection target;
 
     private Connection connection;
 
+    private Client client;
+
+    private TransportClientProvider transportClientProvider;
+
     private final Object connectionMonitor = new Object();
 
     public ElasticSingleConnectionDataSource() {
@@ -27,42 +38,22 @@ public class ElasticSingleConnectionDataSource extends DriverManagerDataSource i
         this.suppressClose = suppressClose;
     }
 
-    public ElasticSingleConnectionDataSource(Connection target, boolean suppressClose) {
-        this.target = target;
-        this.suppressClose = suppressClose;
-        this.connection = (suppressClose ? getCloseSuppressingConnectionProxy(target) : target);
-    }
-
-    public ElasticSingleConnectionDataSource(String url, String username, String password, boolean suppressClose) {
-        super(url, username, password);
-        this.suppressClose = suppressClose;
+    public void setTransportClientProvider(TransportClientProvider transportClientProvider) {
+        this.transportClientProvider = transportClientProvider;
     }
 
     public void setSuppressClose(boolean suppressClose) {
         this.suppressClose = suppressClose;
     }
 
-
     protected boolean isSuppressClose() {
         return this.suppressClose;
     }
 
-
-    public void setAutoCommit(boolean autoCommit) {
-        // ignore
-    }
-
-
-    protected Boolean getAutoCommitValue() {
-        return Boolean.FALSE;
-    }
-
-
     @Override
     public Connection getConnection() throws SQLException {
         synchronized (this.connectionMonitor) {
             if (this.connection == null) {
-                // No underlying Connection -> lazy init via DriverManager.
                 initConnection();
             }
             if (this.connection.isClosed()) {
@@ -99,9 +90,25 @@ public class ElasticSingleConnectionDataSource extends DriverManagerDataSource i
         }
         synchronized (this.connectionMonitor) {
             closeConnection();
-            this.target = getConnectionFromDriver(getUsername(), getPassword());
-            prepareConnection(this.target);
-            this.connection = (isSuppressClose() ? getCloseSuppressingConnectionProxy(this.target) : this.target);
+
+            try {
+                if (transportClientProvider != null) {
+                    client = transportClientProvider.createTransportClientFromUrl(getUrl());
+                    if (client == null) {
+                        throw new SQLException(String.format("Failed to build transport client for url[%s]", getUrl()));
+                    }
+                    target = new ElasticConnection(getUrl(), null, client);
+                }
+                else {
+                    this.target = getConnectionFromDriver(getUsername(), getPassword());
+                }
+            }
+            catch (Exception exp) {
+                throw new SQLException(String.format("Failed to create connection for url[%s]", getUrl()), exp);
+            }
+
+            prepareConnection(target);
+            this.connection = (isSuppressClose() ? getCloseSuppressingConnectionProxy(target) : target);
         }
     }
 
@@ -129,7 +136,16 @@ public class ElasticSingleConnectionDataSource extends DriverManagerDataSource i
                 this.target.close();
             }
             catch (Throwable ex) {
-                //logger.warn("Could not close shared JDBC Connection", ex);
+                logger.warn("Could not close shared JDBC Connection", ex);
+            }
+        }
+
+        if (client != null) {
+            try {
+                client.close();
+            }
+            catch (Exception ex) {
+                logger.error("Could not close elasticsearch client", ex);
             }
         }
     }
@@ -145,6 +161,7 @@ public class ElasticSingleConnectionDataSource extends DriverManagerDataSource i
 
     private static class CloseSuppressingInvocationHandler implements InvocationHandler {
         private final Connection target;
+
         public CloseSuppressingInvocationHandler(Connection target) {
             this.target = target;
         }
@@ -192,4 +209,12 @@ public class ElasticSingleConnectionDataSource extends DriverManagerDataSource i
         }
     }
 
+    protected Boolean getAutoCommitValue() {
+        return Boolean.FALSE;
+    }
+
+    public void setAutoCommit(boolean autoCommit) {
+        // ignore
+    }
+
 }

+ 40 - 7
src/main/java/org/elasticsearch/jdbc/ElasticStatement.java

@@ -1,33 +1,66 @@
 package org.elasticsearch.jdbc;
 
+import com.google.common.collect.Lists;
 import org.elasticsearch.action.search.SearchRequestBuilder;
 import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.dsl.bean.ElasticSqlParseResult;
 import org.elasticsearch.dsl.parser.ElasticSql2DslParser;
+import org.elasticsearch.jdbc.search.JdbcSearchActionExecutor;
+import org.elasticsearch.jdbc.search.JdbcSearchResponse;
+import org.elasticsearch.search.SearchHit;
 
 import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.util.List;
 
 public class ElasticStatement extends AbstractStatement {
-
     protected ElasticConnection connection;
 
     public ElasticStatement(ElasticConnection connection) {
         this.connection = connection;
     }
 
-    private ResultSet executeResult;
+    protected ResultSet executeResult;
 
     @Override
     public ResultSet executeQuery(String sql) throws SQLException {
-        ElasticSql2DslParser sql2DslParser = new ElasticSql2DslParser();
-        ElasticSqlParseResult parseResult = sql2DslParser.parse(sql);
+        return executeQuery(sql, null);
+    }
+
+    @Override
+    public ResultSet executeQuery(String sql, Object[] args) throws SQLException {
+        ElasticSqlParseResult parseResult;
+        try {
+            ElasticSql2DslParser sql2DslParser = new ElasticSql2DslParser();
+            parseResult = sql2DslParser.parse(sql, args);
+        }
+        catch (Exception exp) {
+            throw new SQLException(String.format("[ElasticStatement] Failed to parse sql[%s]", sql), exp);
+        }
+
+        SearchResponse searchResponse;
+        try {
+            SearchRequestBuilder searchRequest = parseResult.toRequest(connection.getClient());
+            searchResponse = JdbcSearchActionExecutor.get().syncExecuteWithException(searchRequest);
+        }
+        catch (Exception exp) {
+            throw new SQLException(String.format("[ElasticStatement] Failed to execute search request sql[%s]", sql), exp);
+        }
 
-        SearchRequestBuilder searchRequest = parseResult.toRequest(connection.getClient());
-        SearchResponse searchResponse = searchRequest.execute().actionGet();
+        JdbcSearchResponse<String> jdbcSearchResponse = new JdbcSearchResponse<String>();
+        jdbcSearchResponse.setFailedShards(searchResponse.getFailedShards());
+        jdbcSearchResponse.setSuccessfulShards(searchResponse.getSuccessfulShards());
+        jdbcSearchResponse.setTookInMillis(searchResponse.getTookInMillis());
+        jdbcSearchResponse.setTotalShards(searchResponse.getTotalShards());
+        jdbcSearchResponse.setTotalHits(searchResponse.getHits().getTotalHits());
 
-        return executeResult = new ElasticResultSet(this, searchResponse.toString());
+        List<String> hits = Lists.newLinkedList();
+        for (SearchHit searchHit : searchResponse.getHits().getHits()) {
+            hits.add(searchHit.getSourceAsString());
+        }
+        jdbcSearchResponse.setDocList(hits);
+        return executeResult = new ElasticResultSet(this, jdbcSearchResponse.toJson());
     }
 
     @Override

+ 0 - 4
src/main/java/org/elasticsearch/jdbc/SearchActionExecutor.java

@@ -1,4 +0,0 @@
-package org.elasticsearch.jdbc;
-
-public class SearchActionExecutor {
-}

+ 23 - 0
src/main/java/org/elasticsearch/jdbc/exception/ResolveSearchResponseException.java

@@ -0,0 +1,23 @@
+package org.elasticsearch.jdbc.exception;
+
+
+public class ResolveSearchResponseException extends RuntimeException {
+    public ResolveSearchResponseException() {
+    }
+
+    public ResolveSearchResponseException(String message) {
+        super(message);
+    }
+
+    public ResolveSearchResponseException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public ResolveSearchResponseException(Throwable cause) {
+        super(cause);
+    }
+
+    public ResolveSearchResponseException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
+        super(message, cause, enableSuppression, writableStackTrace);
+    }
+}

+ 58 - 0
src/main/java/org/elasticsearch/jdbc/search/JdbcSearchActionExecutor.java

@@ -0,0 +1,58 @@
+package org.elasticsearch.jdbc.search;
+
+import org.elasticsearch.action.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JdbcSearchActionExecutor {
+
+    private static final Logger logger = LoggerFactory.getLogger(JdbcSearchActionExecutor.class);
+
+    private static final JdbcSearchActionExecutor JDBC_SEARCH_ACTION_EXECUTOR = new JdbcSearchActionExecutor();
+
+    private JdbcSearchActionExecutor() {
+
+    }
+
+    public static JdbcSearchActionExecutor get() {
+        return JDBC_SEARCH_ACTION_EXECUTOR;
+    }
+
+    public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>> Response syncExecuteWithException(RequestBuilder requestBuilder) {
+        ListenableActionFuture<Response> searchActionFuture = requestBuilder.execute();
+        searchActionFuture.addListener(defaultActionListener(requestBuilder));
+        return searchActionFuture.actionGet();
+    }
+
+    public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>> Response syncExecute(RequestBuilder requestBuilder) {
+        try {
+            ListenableActionFuture<Response> searchActionFuture = requestBuilder.execute();
+            searchActionFuture.addListener(defaultActionListener(requestBuilder));
+            return searchActionFuture.actionGet();
+        }
+        catch (Exception ex) {
+            return null;
+        }
+    }
+
+    public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>> void asyncExecute(RequestBuilder requestBuilder) {
+        ListenableActionFuture<Response> searchActionFuture = requestBuilder.execute();
+        searchActionFuture.addListener(defaultActionListener(requestBuilder));
+    }
+
+    private <Response extends ActionResponse> ActionListener<Response> defaultActionListener(ActionRequestBuilder requestBuilder) {
+        return new ActionListener<Response>() {
+            @Override
+            public void onResponse(Response response) {
+                logger.debug(String.format("[Search_Request] %s", requestBuilder.toString()));
+                logger.debug(String.format("[Search_Response] %s", response.toString()));
+            }
+
+            @Override
+            public void onFailure(Throwable throwable) {
+                logger.error("Execute search req error!", throwable);
+            }
+        };
+    }
+
+}

+ 73 - 0
src/main/java/org/elasticsearch/jdbc/search/JdbcSearchResponse.java

@@ -0,0 +1,73 @@
+package org.elasticsearch.jdbc.search;
+
+import com.google.gson.Gson;
+
+import java.util.List;
+
+public class JdbcSearchResponse<T> {
+    private int totalShards;
+    private int failedShards;
+    private int successfulShards;
+    private long tookInMillis;
+    private long totalHits;
+
+    private List<T> docList;
+
+    public int getTotalShards() {
+        return totalShards;
+    }
+
+    public long getTotalHits() {
+        return totalHits;
+    }
+
+    public void setTotalHits(long totalHits) {
+        this.totalHits = totalHits;
+    }
+
+    public void setTotalShards(int totalShards) {
+        this.totalShards = totalShards;
+    }
+
+    public int getFailedShards() {
+        return failedShards;
+    }
+
+    public void setFailedShards(int failedShards) {
+        this.failedShards = failedShards;
+    }
+
+    public int getSuccessfulShards() {
+        return successfulShards;
+    }
+
+    public void setSuccessfulShards(int successfulShards) {
+        this.successfulShards = successfulShards;
+    }
+
+    public long getTookInMillis() {
+        return tookInMillis;
+    }
+
+    public void setTookInMillis(long tookInMillis) {
+        this.tookInMillis = tookInMillis;
+    }
+
+    public List<T> getDocList() {
+        return docList;
+    }
+
+    public void setDocList(List<T> docList) {
+        this.docList = docList;
+    }
+
+    @Override
+    public String toString() {
+        return toJson();
+    }
+
+    public String toJson() {
+        return new Gson().toJson(this, JdbcSearchResponse.class);
+    }
+
+}

+ 80 - 0
src/main/java/org/elasticsearch/jdbc/search/JdbcSearchResponseResolver.java

@@ -0,0 +1,80 @@
+package org.elasticsearch.jdbc.search;
+
+import com.google.common.collect.Lists;
+import com.google.common.reflect.TypeToken;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang.StringUtils;
+import org.elasticsearch.jdbc.exception.ResolveSearchResponseException;
+import org.elasticsearch.utils.Constants;
+
+import java.util.Collections;
+import java.util.List;
+
+public class JdbcSearchResponseResolver {
+    private String oriSearchResponseGson;
+
+    public JdbcSearchResponseResolver(String oriSearchResponseGson) {
+        if (StringUtils.isBlank(oriSearchResponseGson)) {
+            throw new IllegalArgumentException("param[oriSearchResponseGson] can not be null");
+        }
+        this.oriSearchResponseGson = oriSearchResponseGson;
+    }
+
+    public String getOriSearchResponseGson() {
+        return oriSearchResponseGson;
+    }
+
+    public <T> JdbcSearchResponse<T> resolveSearchResponse(Class<T> clazz) throws ResolveSearchResponseException {
+        Gson defaultEsDateFormatGson = new GsonBuilder().setDateFormat(Constants.DEFAULT_ES_DATE_FORMAT).create();
+        return resolveSearchResponse(clazz, defaultEsDateFormatGson);
+    }
+
+    public <T> JdbcSearchResponse<T> resolveSearchResponse(Class<T> clazz, String dataPattern) throws ResolveSearchResponseException {
+        Gson defaultEsDateFormatGson = new GsonBuilder().setDateFormat(dataPattern).create();
+        return resolveSearchResponse(clazz, defaultEsDateFormatGson);
+    }
+
+    public <T> JdbcSearchResponse<T> resolveSearchResponse(Class<T> clazz, Gson gson) throws ResolveSearchResponseException {
+        return resolveSearchResponse(new ResolveStrategy<T>() {
+            @Override
+            public T resolve(String document) {
+                return gson.fromJson(document, clazz);
+            }
+        });
+    }
+
+    public <T> JdbcSearchResponse<T> resolveSearchResponse(ResolveStrategy<T> resolveStrategy) throws ResolveSearchResponseException {
+        JdbcSearchResponse<String> searchRespStrGson;
+        try {
+            searchRespStrGson = new Gson().fromJson(oriSearchResponseGson, new TypeToken<JdbcSearchResponse<String>>() {}.getType());
+        }
+        catch (Exception exp) {
+            throw new ResolveSearchResponseException(String.format("Failed to parse responseGson[%s] to JdbcSearchResponse", oriSearchResponseGson));
+        }
+
+        JdbcSearchResponse<T> jdbcSearchResponse = new JdbcSearchResponse<T>();
+        jdbcSearchResponse.setTotalShards(searchRespStrGson.getTotalShards());
+        jdbcSearchResponse.setTotalHits(searchRespStrGson.getTotalHits());
+        jdbcSearchResponse.setTookInMillis(searchRespStrGson.getTookInMillis());
+        jdbcSearchResponse.setFailedShards(searchRespStrGson.getFailedShards());
+        jdbcSearchResponse.setSuccessfulShards(searchRespStrGson.getSuccessfulShards());
+
+        if (CollectionUtils.isEmpty(searchRespStrGson.getDocList())) {
+            jdbcSearchResponse.setDocList(Collections.emptyList());
+            return jdbcSearchResponse;
+        }
+
+        List<T> docList = Lists.newLinkedList();
+        for (String doc : searchRespStrGson.getDocList()) {
+            docList.add(resolveStrategy.resolve(doc));
+        }
+        jdbcSearchResponse.setDocList(docList);
+        return jdbcSearchResponse;
+    }
+
+    public interface ResolveStrategy<T> {
+        T resolve(String document);
+    }
+}

+ 7 - 0
src/main/java/org/elasticsearch/jdbc/search/TransportClientProvider.java

@@ -0,0 +1,7 @@
+package org.elasticsearch.jdbc.search;
+
+import org.elasticsearch.client.transport.TransportClient;
+
+public interface TransportClientProvider {
+    TransportClient createTransportClientFromUrl(String url);
+}

+ 32 - 17
src/main/java/org/elasticsearch/jdbc/TransportClientFactory.java

@@ -1,40 +1,58 @@
-package org.elasticsearch.jdbc;
+package org.elasticsearch.jdbc.search;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import org.elasticsearch.client.transport.TransportClient;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.elasticsearch.utils.Constants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.net.InetSocketAddress;
 import java.util.List;
 import java.util.Map;
 
-public class TransportClientFactory {
-
-    private static final String COMMA = ",";
-    private static final String COLON = ":";
+public class TransportClientProviderImpl implements TransportClientProvider {
     private static final int DEFAULT_ES_PORT = 9300;
 
-    private static final Map<String, TransportClient> clientMap = Maps.newConcurrentMap();
+    private static final Logger logger = LoggerFactory.getLogger(TransportClientProviderImpl.class);
+
+    private static final String ELASTIC_SEARCH_DRIVER_PREFIX = "jdbc:elastic:";
+
+    private final Map<String, TransportClient> clientMap = Maps.newHashMap();
 
-    private TransportClientFactory() {
+    public TransportClientProviderImpl() {
 
     }
 
-    public static TransportClient createTransportClientFromUrl(String url) {
+    public TransportClient createTransportClientFromUrl(final String url) {
         if (clientMap.containsKey(url)) {
             return clientMap.get(url);
         }
 
+        try {
+            TransportClient transportClient = internalBuildTransportClient(url);
+            clientMap.put(url, transportClient);
+        }
+        catch (Exception ex) {
+            logger.error(String.format("[TransportClientProviderImpl] Failed to build transport client for url[%s]", url), ex);
+        }
+
+        return clientMap.get(url);
+    }
+
+    private TransportClient internalBuildTransportClient(String url) {
+        String ipUrl = url.substring(ELASTIC_SEARCH_DRIVER_PREFIX.length());
+
         Settings.Builder settingBuilder = Settings.settingsBuilder();
         settingBuilder.put("client.transport.sniff", true);
 
-        String hostListString = url;
-        int clusterNamePosIdx = url.lastIndexOf("/");
+        String hostListString = ipUrl;
+        int clusterNamePosIdx = ipUrl.lastIndexOf("/");
         if (clusterNamePosIdx >= 0) {
             hostListString = hostListString.substring(0, clusterNamePosIdx);
-            settingBuilder.put("cluster.name", url.substring(clusterNamePosIdx + 1));
+            settingBuilder.put("cluster.name", ipUrl.substring(clusterNamePosIdx + 1));
         }
         else {
             settingBuilder.put("client.transport.ignore_cluster_name", true);
@@ -42,9 +60,9 @@ public class TransportClientFactory {
 
         List<InetSocketTransportAddress> addressList = Lists.newLinkedList();
 
-        String[] connStringList = hostListString.split(COMMA);
+        String[] connStringList = hostListString.split(Constants.COMMA);
         for (String connStr : connStringList) {
-            String[] connArr = connStr.split(COLON);
+            String[] connArr = connStr.split(Constants.COLON);
             if (connArr.length == 1) {
                 addressList.add(new InetSocketTransportAddress(new InetSocketAddress(connArr[0], DEFAULT_ES_PORT)));
             }
@@ -53,11 +71,8 @@ public class TransportClientFactory {
             }
         }
 
-        TransportClient transportClient = TransportClient.builder().settings(settingBuilder).build()
+        return TransportClient.builder().settings(settingBuilder).build()
                 .addTransportAddresses(addressList.toArray(new InetSocketTransportAddress[addressList.size()]));
 
-        clientMap.put(url, transportClient);
-
-        return clientMap.get(url);
     }
 }

+ 7 - 0
src/main/java/org/elasticsearch/utils/Constants.java

@@ -0,0 +1,7 @@
+package org.elasticsearch.utils;
+
+public class Constants {
+    public static final String COMMA = ",";
+    public static final String COLON = ":";
+    public static final String DEFAULT_ES_DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSZ";
+}

+ 3 - 1
src/main/java/org/elasticsearch/client/ElasticMockClient.java

@@ -1,4 +1,4 @@
-package org.elasticsearch.client;
+package org.elasticsearch.utils;
 
 import org.elasticsearch.action.*;
 import org.elasticsearch.action.bulk.BulkRequest;
@@ -41,6 +41,8 @@ import org.elasticsearch.action.termvectors.*;
 import org.elasticsearch.action.update.UpdateRequest;
 import org.elasticsearch.action.update.UpdateRequestBuilder;
 import org.elasticsearch.action.update.UpdateResponse;
+import org.elasticsearch.client.AdminClient;
+import org.elasticsearch.client.Client;
 import org.elasticsearch.client.support.Headers;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.threadpool.ThreadPool;

+ 54 - 10
src/test/java/org/elasticsearch/jdbc/ElasticDriverTest.java

@@ -1,23 +1,21 @@
 package org.elasticsearch.jdbc;
 
 
+import org.elasticsearch.jdbc.bean.Product;
+import org.elasticsearch.jdbc.search.JdbcSearchResponse;
+import org.elasticsearch.jdbc.search.JdbcSearchResponseResolver;
 import org.junit.Test;
 
-import java.sql.Connection;
-import java.sql.Driver;
-import java.sql.DriverManager;
-import java.sql.ResultSet;
+import java.sql.*;
 import java.util.Enumeration;
 
 public class ElasticDriverTest {
     private static final String driver = "org.elasticsearch.jdbc.ElasticDriver";
     private static final String url = "jdbc:elastic:192.168.0.109:9300/judge_cluster";
 
-
     @Test
     public void testLoadDriver() throws Exception {
         Class.forName(driver);
-
         Enumeration<Driver> driverEnumeration = DriverManager.getDrivers();
 
         while (driverEnumeration.hasMoreElements()) {
@@ -51,11 +49,57 @@ public class ElasticDriverTest {
         dataSource.setDriverClassName(driver);
 
         Connection connection = dataSource.getConnection();
-        ResultSet resultSet = connection.createStatement().executeQuery("select * from index.library where manager.managerName='lcy'");
+        ResultSet resultSet = connection.createStatement().executeQuery("select * from index.product where productCode='IP_6S'");
+
+        String responseGson = resultSet.getString(1);
+        JdbcSearchResponseResolver jdbcSearchResponseResolver = new JdbcSearchResponseResolver(responseGson);
+        JdbcSearchResponse<Product> jdbcSearchResponse = jdbcSearchResponseResolver.resolveSearchResponse(Product.class);
+
+        for (Product product : jdbcSearchResponse.getDocList()) {
+            System.out.println(product.getProductName());
+        }
+    }
+
+    @Test
+    public void testQuery2() throws Exception {
+        ElasticSingleConnectionDataSource dataSource = new ElasticSingleConnectionDataSource(url, true);
+        dataSource.setDriverClassName(driver);
+
+        Connection connection = dataSource.getConnection();
+        String sql = "select * from index.product where productCode=? and provider.providerLevel > ?";
+
+        PreparedStatement preparedStatement = connection.prepareStatement(sql);
+        preparedStatement.setString(1, "AW_OS2");
+        preparedStatement.setInt(2, 0);
+
+        ResultSet resultSet = preparedStatement.executeQuery();
+
+        String responseGson = resultSet.getString(1);
+        JdbcSearchResponseResolver jdbcSearchResponseResolver = new JdbcSearchResponseResolver(responseGson);
+        JdbcSearchResponse<Product> jdbcSearchResponse = jdbcSearchResponseResolver.resolveSearchResponse(Product.class);
+
+        for (Product product : jdbcSearchResponse.getDocList()) {
+            System.out.println(product.getProductName());
+        }
+    }
+
+    @Test
+    public void testPrefixQuery2() throws Exception {
+        ElasticSingleConnectionDataSource dataSource = new ElasticSingleConnectionDataSource(url, true);
+        dataSource.setDriverClassName(driver);
+
+        Connection connection = dataSource.getConnection();
+        String sql = "select * from index.product where prefix(productName, 'iphone') and $buyers.productPrice > 1000";
+
+        PreparedStatement preparedStatement = connection.prepareStatement(sql);
+        ResultSet resultSet = preparedStatement.executeQuery();
+
+        String responseGson = resultSet.getString(1);
+        JdbcSearchResponseResolver jdbcSearchResponseResolver = new JdbcSearchResponseResolver(responseGson);
+        JdbcSearchResponse<Product> jdbcSearchResponse = jdbcSearchResponseResolver.resolveSearchResponse(Product.class);
 
-        while(resultSet.next()) {
-            String json = resultSet.getString(1);
-            System.out.println(json);
+        for (Product product : jdbcSearchResponse.getDocList()) {
+            System.out.println(product.getProductName());
         }
     }
 }

+ 23 - 0
src/test/java/org/elasticsearch/jdbc/bean/Buyer.java

@@ -0,0 +1,23 @@
+package org.elasticsearch.jdbc.bean;
+
+public class Buyer {
+    private String buyerName;
+
+    private Integer productPrice;
+
+    public String getBuyerName() {
+        return buyerName;
+    }
+
+    public void setBuyerName(String buyerName) {
+        this.buyerName = buyerName;
+    }
+
+    public Integer getProductPrice() {
+        return productPrice;
+    }
+
+    public void setProductPrice(Integer productPrice) {
+        this.productPrice = productPrice;
+    }
+}

+ 66 - 0
src/test/java/org/elasticsearch/jdbc/bean/Product.java

@@ -0,0 +1,66 @@
+package org.elasticsearch.jdbc.bean;
+
+import java.math.BigDecimal;
+import java.util.List;
+
+public class Product {
+    private String productName;
+
+    private String productCode;
+
+    private BigDecimal minPrice;
+
+    private BigDecimal advicePrice;
+
+    private Provider provider;
+
+    private List<Buyer> buyers;
+
+    public String getProductName() {
+        return productName;
+    }
+
+    public void setProductName(String productName) {
+        this.productName = productName;
+    }
+
+    public String getProductCode() {
+        return productCode;
+    }
+
+    public void setProductCode(String productCode) {
+        this.productCode = productCode;
+    }
+
+    public BigDecimal getMinPrice() {
+        return minPrice;
+    }
+
+    public void setMinPrice(BigDecimal minPrice) {
+        this.minPrice = minPrice;
+    }
+
+    public BigDecimal getAdvicePrice() {
+        return advicePrice;
+    }
+
+    public void setAdvicePrice(BigDecimal advicePrice) {
+        this.advicePrice = advicePrice;
+    }
+
+    public Provider getProvider() {
+        return provider;
+    }
+
+    public void setProvider(Provider provider) {
+        this.provider = provider;
+    }
+
+    public List<Buyer> getBuyers() {
+        return buyers;
+    }
+
+    public void setBuyers(List<Buyer> buyers) {
+        this.buyers = buyers;
+    }
+}

+ 23 - 0
src/test/java/org/elasticsearch/jdbc/bean/Provider.java

@@ -0,0 +1,23 @@
+package org.elasticsearch.jdbc.bean;
+
+public class Provider {
+    private String providerName;
+
+    private Integer providerLevel;
+
+    public String getProviderName() {
+        return providerName;
+    }
+
+    public void setProviderName(String providerName) {
+        this.providerName = providerName;
+    }
+
+    public Integer getProviderLevel() {
+        return providerLevel;
+    }
+
+    public void setProviderLevel(Integer providerLevel) {
+        this.providerLevel = providerLevel;
+    }
+}

+ 1 - 1
src/test/java/org/elasticsearch/query/SqlParserWhereConditionTest.java

@@ -10,7 +10,7 @@ import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.search.aggregations.AbstractAggregationBuilder;
 import org.elasticsearch.search.aggregations.AggregationBuilders;
 import org.elasticsearch.search.aggregations.bucket.terms.TermsBuilder;
-import org.elasticsearch.client.ElasticMockClient;
+import org.elasticsearch.utils.ElasticMockClient;
 import org.junit.Assert;
 import org.junit.Test;