import Ignore_ExecPageFlipper as ExecPageFlipper import com.dderp.business.dao.ProductDao import com.dderp.common.api.NoSqlKeysService import com.dderp.common.datas.ESKeys import com.dderp.common.entity.product.WorkProcess import com.dySweetFishPlugin.elasticsearch.ESClient import com.sweetfish.convert.json.JsonConvert import com.sweetfish.source.PageFlipper import com.sweetfish.util.Utility import org.apache.logging.log4j.Logger import org.elasticsearch.action.bulk.BulkRequestBuilder import org.elasticsearch.action.bulk.BulkResponse import org.elasticsearch.action.index.IndexRequestBuilder import org.elasticsearch.common.xcontent.XContentType import org.rex.RMap static def createRunnable(ESClient esClient, NoSqlKeysService keysService, ProductDao productDao, JsonConvert jsonConvert, Logger logger, String dataSourceId, long supplierCode) { return { esClient.createIndex(keysService.getESKey(ESKeys.ES_ERP_WORK_PROCESS_INDEX, supplierCode), ESKeys.INDEX_CONFIG, Utility.ofMap(ESKeys.ES_ERP_WORK_PROCESS_TYPE, "workprocess.json")) RMap params = new RMap() ExecPageFlipper.execute(300, { PageFlipper p -> List workProcessList = productDao.selectWorkProcess(params, p, dataSourceId, supplierCode) if (!workProcessList.isEmpty()) { BulkRequestBuilder bulkRequest = esClient.getClient().prepareBulk() workProcessList.each { x -> IndexRequestBuilder indexRequest = esClient.getClient() .prepareIndex(keysService.getESKey(ESKeys.ES_ERP_WORK_PROCESS_INDEX, supplierCode), ESKeys.ES_ERP_WORK_PROCESS_TYPE) .setId(String.valueOf(x.getId())) .setSource(jsonConvert.convertTo(x), XContentType.JSON) bulkRequest.add(indexRequest) } BulkResponse bulkResponse = bulkRequest.execute().actionGet() if (bulkResponse.hasFailures()) { logger.error("索引工艺档案出错:" + bulkResponse.buildFailureMessage()) } else { logger.info("索引工艺档案,共" + workProcessList.size() + "条记录") } } }) } as Runnable } //groovy最后一个表达式的值为返回 createRunnable(esClient, keysService, productDao, jsonConvert, logger, dataSourceId, supplierCode)