import com.dderp.common.entity.material.MaterialInfo import com.dderp.common.entity.material.MaterialInfoType import com.dySweetFishPlugin.elasticsearch.ESClient import com.dySweetFishPlugin.sql.TableCodeNode import com.dySweetFishPlugin.sql.TableIdService import com.sweetfish.convert.json.JsonConvert import com.sweetfish.source.PageFlipper import com.sweetfish.util.Utility import org.apache.commons.lang3.StringUtils import org.apache.logging.log4j.Logger import com.dderp.business.dao.MaterialDao import com.dderp.common.api.NoSqlKeysService import com.dderp.common.datas.ESKeys import org.elasticsearch.action.bulk.BulkRequestBuilder import org.elasticsearch.action.bulk.BulkResponse import org.elasticsearch.action.index.IndexRequestBuilder import org.elasticsearch.common.xcontent.XContentType import org.rex.RMap static def createRunnable(ESClient esClient, NoSqlKeysService keysService, MaterialDao materialDao, TableIdService tableIdService, JsonConvert jsonConvert, Logger logger, String dataSourceId, long supplierCode) { return { RMap params = new RMap() //辅料类型档案 esClient.createIndex(keysService.getESKey(ESKeys.ES_ERP_MATERIALTYPE_INDEX, supplierCode), ESKeys.INDEX_CONFIG, Utility.ofMap(ESKeys.ES_ERP_MATERIALTYPE_TYPE, "materialtype.json")) Ignore_ExecPageFlipper.execute(1000, { PageFlipper p -> List materialInfoTypeList = materialDao.selectMaterialType(params, p, dataSourceId, supplierCode) if (!materialInfoTypeList.isEmpty()) { BulkRequestBuilder bulkRequest = esClient.getClient().prepareBulk() materialInfoTypeList.forEach((x) -> { IndexRequestBuilder indexRequest = esClient.getClient() .prepareIndex(keysService.getESKey(ESKeys.ES_ERP_MATERIALTYPE_INDEX, supplierCode), ESKeys.ES_ERP_MATERIALTYPE_TYPE) .setId(String.valueOf(x.getId())) .setSource(jsonConvert.convertTo(x), XContentType.JSON) bulkRequest.add(indexRequest) if (StringUtils.isNotBlank(x.getMaterialTypeCode())) { tableIdService.addTableCodeNode(TableCodeNode.newBuilder() .codeName("tbMaterialInfo.code." + x.getMaterialTypeCode().trim()) .tableName("tbMaterialInfo") .codeFieldName("materialCode") .codePrefix(x.getMaterialTypeCode()) .codeYear(false) .yearLength(4) .codeMonth(false) .codeDay(false) .codeLength(5) .build(), true, dataSourceId, String.valueOf(supplierCode)) } }) BulkResponse bulkResponse = bulkRequest.execute().actionGet() if (bulkResponse.hasFailures()) { logger.error("索引辅料分类出错:" + bulkResponse.buildFailureMessage()) } else { logger.info("索引辅料分类,共" + materialInfoTypeList.size() + "条记录") } } }) //辅料档案 esClient.createIndex(keysService.getESKey(ESKeys.ES_ERP_MATERIALINFO_INDEX, supplierCode), ESKeys.INDEX_CONFIG, Utility.ofMap(ESKeys.ES_ERP_MATERIALINFO_TYPE, "materialinfo.json")) Ignore_ExecPageFlipper.execute(1000, { PageFlipper p -> List materialInfoList = materialDao.selectMaterialInfo(params, p, dataSourceId, supplierCode) if (!materialInfoList.isEmpty()) { BulkRequestBuilder bulkRequest = esClient.getClient().prepareBulk() materialInfoList.forEach((x) -> { IndexRequestBuilder indexRequest = esClient.getClient() .prepareIndex(keysService.getESKey(ESKeys.ES_ERP_MATERIALINFO_INDEX, supplierCode), ESKeys.ES_ERP_MATERIALINFO_TYPE) .setId(String.valueOf(x.getId())) .setSource(jsonConvert.convertTo(x), XContentType.JSON) bulkRequest.add(indexRequest) }) BulkResponse bulkResponse = bulkRequest.execute().actionGet() if (bulkResponse.hasFailures()) { logger.error("索引辅料出错:" + bulkResponse.buildFailureMessage()) } else { logger.info("索引辅料,共" + materialInfoList.size() + "条记录") } } }) } as Runnable } //groovy最后一个表达式的值为返回 createRunnable(esClient, keysService, materialDao, tableIdService, jsonConvert, logger, dataSourceId, supplierCode)