import com.dderp.business.dao.OutAssistDao import com.dderp.common.api.NoSqlKeysService import com.dderp.common.datas.ESKeys import com.dderp.common.entity.outassist.OutAssistInfo import com.dySweetFishPlugin.elasticsearch.ESClient import com.sweetfish.convert.json.JsonConvert import com.sweetfish.source.PageFlipper import com.sweetfish.util.Utility import org.apache.logging.log4j.Logger import org.elasticsearch.action.bulk.BulkRequestBuilder import org.elasticsearch.action.bulk.BulkResponse import org.elasticsearch.action.index.IndexRequestBuilder import org.elasticsearch.common.xcontent.XContentType import org.rex.RMap import java.time.Instant static def createRunnable(ESClient esClient, NoSqlKeysService keysService, OutAssistDao outAssistDao, JsonConvert jsonConvert, Logger logger, String dataSourceId, long supplierCode) { RMap params = new RMap(); return { esClient.createIndex(keysService.getESKey(ESKeys.ES_ERP_OUTASSIST_INFO_INDEX, supplierCode), ESKeys.INDEX_CONFIG, Utility.ofMap(ESKeys.ES_ERP_OUTASSIST_INFO_TYPE, "outassistinfo.json")); Ignore_ExecPageFlipper.execute(1000, { PageFlipper p -> List outAssistInfoList = outAssistDao.queryOutAssistInfo(params, p, dataSourceId, supplierCode); if (!outAssistInfoList.isEmpty()) { BulkRequestBuilder bulkRequest = esClient.getClient().prepareBulk(); outAssistInfoList.forEach((x) -> { if (x.getCreateTime() != null) { x.setCreateTimeLong(x.getCreateTime().getTime()); } else { x.setCreateTimeLong(Date.from(Instant.now()).getTime()); } if (x.getUpdateTime() != null) { x.setUpdateTimeLong(x.getUpdateTime().getTime()); } else { x.setUpdateTimeLong(Date.from(Instant.now()).getTime()); } IndexRequestBuilder indexRequest = esClient.getClient() .prepareIndex(keysService.getESKey(ESKeys.ES_ERP_OUTASSIST_INFO_INDEX, supplierCode), ESKeys.ES_ERP_OUTASSIST_INFO_TYPE) .setId(String.valueOf(x.getId())) .setSource(jsonConvert.convertTo(x), XContentType.JSON); bulkRequest.add(indexRequest); }); BulkResponse bulkResponse = bulkRequest.execute().actionGet(); if (bulkResponse.hasFailures()) { logger.error("索引外协档案:" + bulkResponse.buildFailureMessage()); } else { logger.info("索引外协档案共" + outAssistInfoList.size() + "条记录"); } } }); } as Runnable } //groovy最后一个表达式的值为返回 createRunnable(esClient, keysService, outAssistDao, jsonConvert, logger, dataSourceId, supplierCode)