import com.dderp.business.dao.SupplierDao import com.dderp.common.api.NoSqlKeysService import com.dderp.common.datas.ESKeys import com.dderp.common.entity.supply.SupplierInfo import com.dySweetFishPlugin.elasticsearch.ESClient import com.sweetfish.convert.json.JsonConvert import com.sweetfish.source.PageFlipper import com.sweetfish.util.Utility import org.apache.logging.log4j.Logger import org.elasticsearch.action.bulk.BulkRequestBuilder import org.elasticsearch.action.bulk.BulkResponse import org.elasticsearch.action.index.IndexRequestBuilder import org.elasticsearch.common.xcontent.XContentType import org.rex.RMap static def createRunnable(ESClient esClient, NoSqlKeysService keysService, SupplierDao supplierDao, JsonConvert jsonConvert, Logger logger, String dataSourceId, long supplierCode) { RMap params = new RMap(); return { esClient.createIndex(keysService.getESKey(ESKeys.ES_ERP_SUPPLIER_INFO_INDEX, supplierCode), ESKeys.INDEX_CONFIG, Utility.ofMap(ESKeys.ES_ERP_SUPPLIER_INFO_TYPE, "supplierinfo.json")); Ignore_ExecPageFlipper.execute(1000, { PageFlipper p -> List supplierInfoList = supplierDao.querySupplierInfo(params, p, dataSourceId, supplierCode); if (!supplierInfoList.isEmpty()) { BulkRequestBuilder bulkRequest = esClient.getClient().prepareBulk(); supplierInfoList.forEach((x) -> { IndexRequestBuilder indexRequest = esClient.getClient() .prepareIndex(keysService.getESKey(ESKeys.ES_ERP_SUPPLIER_INFO_INDEX, supplierCode), ESKeys.ES_ERP_SUPPLIER_INFO_TYPE) .setId(String.valueOf(x.getId())) .setSource(jsonConvert.convertTo(x), XContentType.JSON); bulkRequest.add(indexRequest); }); BulkResponse bulkResponse = bulkRequest.execute().actionGet(); if (bulkResponse.hasFailures()) { logger.error("索引供应商档案:" + bulkResponse.buildFailureMessage()); } else { logger.info("索引供应商档案共" + supplierInfoList.size() + "条记录"); } } }); } as Runnable } //groovy最后一个表达式的值为返回 createRunnable(esClient, keysService, supplierDao, jsonConvert, logger, dataSourceId, supplierCode)