import com.yinjie.heating.common.api.BusinessExecutor import com.yinjie.heating.common.api.SupplierInitService import com.yinjie.heating.common.datas.ERPModule import com.yinjie.heating.common.datas.ESKeys import com.yinjie.heating.common.entity.base.ProcessStringItem import com.yinjie.heating.common.entity.order.* import com.dySweetFishPlugin.elasticsearch.ESClient import com.dySweetFishPlugin.sql.DBService import com.sweetfish.convert.json.JsonConvert import com.sweetfish.service.RetResult import com.sweetfish.source.PageFlipper import com.sweetfish.util.Utility import org.apache.logging.log4j.LogManager import org.apache.logging.log4j.Logger import org.elasticsearch.action.bulk.BulkRequestBuilder import org.elasticsearch.action.bulk.BulkResponse import org.elasticsearch.action.index.IndexRequestBuilder import org.elasticsearch.action.support.WriteRequest import org.elasticsearch.common.xcontent.XContentType import javax.annotation.Resource @SuppressWarnings(["unused"]) class BE_BusinessOrder_SyncDataBase2ES implements BusinessExecutor { protected final Logger logger = LogManager.getLogger(this.getClass().getSimpleName()) @Resource SupplierInitService supplierService @Resource ESClient esClient @Resource JsonConvert jsonConvert @Resource DBService dbService @Override String scriptName() { return "外卖订单-ES同步数据库" } @Override ERPModule module() { return ERPModule.DATA_SYNC } @Override RetResult execute(ProcessStringItem source) { String dataSourceId = source.getDataSourceId() long supplierCode = source.getSupplierCode() esClient.checkIndexEx(supplierService.getDateYearESIndex(supplierCode, ESKeys.ES_DELIVER_BUSINESS_ORDER_INDEX, 0), ESKeys.INDEX_CONFIG, Utility.ofMap(ESKeys.ES_DELIVER_BUSINESS_ORDER_TYPE, "businessorder.json", ESKeys.ES_DELIVER_ORDER_DELIVERY_INFO_TYPE, "orderdeliveryinfo.json", ESKeys.ES_DELIVER_ORDER_FINANCES_TYPE, "orderfinances.json", ESKeys.ES_DELIVER_ORDER_DISCOUNT_TYPE, "orderdiscount.json", ESKeys.ES_DELIVER_ORDER_DETAIL_ITEM_TYPE, "orderdetailitem.json", ESKeys.ES_DELIVER_ORDER_AFTER_SALE_BILL_TYPE, "orderaftersalebill.json", ESKeys.ES_DELIVER_ORDER_RIDER_ITEM_TYPE, "orderrideritem.json")) ExecPageFlipper.execute(500, { PageFlipper p -> List dbList = dbService. getList(dataSourceId, "select * from deBusinessOrder${supplierCode}_Current", BusinessOrder.class, p) if (!dbList.isEmpty()) { BulkRequestBuilder bulkRequest = esClient.getClient().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) dbList.each { order -> IndexRequestBuilder esRequest = esClient.getClient() .prepareIndex(supplierService.getDateYearESIndex(source.supplierCode, ESKeys.ES_DELIVER_BUSINESS_ORDER_INDEX, 1), ESKeys.ES_DELIVER_BUSINESS_ORDER_TYPE) .setId(String.valueOf(order.id)) .setSource(source.getConvert("BusinessOrder").convertTo(order), XContentType.JSON) bulkRequest.add(esRequest) } BulkResponse bulkResponse = bulkRequest.execute().actionGet() if (bulkResponse.hasFailures()) { logger.error("索引订单:" + bulkResponse.buildFailureMessage()) } else { logger.info("索引订单共" + dbList.size() + "条记录") } } }) ExecPageFlipper.execute(500, { PageFlipper p -> List dbList = dbService. getList(dataSourceId, "select * from deOrderDetailItem${supplierCode}_Current", OrderDetailItem.class, p) if (!dbList.isEmpty()) { BulkRequestBuilder bulkRequest = esClient.getClient().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) dbList.each { info -> IndexRequestBuilder esRequest = esClient.getClient() .prepareIndex(supplierService.getDateYearESIndex(source.supplierCode, ESKeys.ES_DELIVER_BUSINESS_ORDER_INDEX, 1), ESKeys.ES_DELIVER_ORDER_DETAIL_ITEM_TYPE) .setParent(String.valueOf(info.idOrder)) .setId(String.valueOf(info.id)) .setSource(jsonConvert.convertTo(info), XContentType.JSON) bulkRequest.add(esRequest) } BulkResponse bulkResponse = bulkRequest.execute().actionGet() if (bulkResponse.hasFailures()) { logger.error("索引订单detailItem:" + bulkResponse.buildFailureMessage()) } else { logger.info("索引订单共detailItem" + dbList.size() + "条记录") } } }) ExecPageFlipper.execute(500, { PageFlipper p -> List dbList = dbService. getList(dataSourceId, "select * from deOrderDeliveryInfo${supplierCode}_Current", OrderDeliveryInfo.class, p) if (!dbList.isEmpty()) { BulkRequestBuilder bulkRequest = esClient.getClient().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) dbList.each { info -> IndexRequestBuilder esRequest = esClient.getClient() .prepareIndex(supplierService.getDateYearESIndex(source.supplierCode, ESKeys.ES_DELIVER_BUSINESS_ORDER_INDEX, 1), ESKeys.ES_DELIVER_ORDER_DELIVERY_INFO_TYPE) .setParent(String.valueOf(info.idOrder)) .setId(String.valueOf(info.id)) .setSource(jsonConvert.convertTo(info), XContentType.JSON) bulkRequest.add(esRequest) } BulkResponse bulkResponse = bulkRequest.execute().actionGet() if (bulkResponse.hasFailures()) { logger.error("索引订单DeliveryInfo:" + bulkResponse.buildFailureMessage()) } else { logger.info("索引订单共DeliveryInfo" + dbList.size() + "条记录") } } }) ExecPageFlipper.execute(500, { PageFlipper p -> List dbList = dbService. getList(dataSourceId, "select * from deOrderFinances${supplierCode}_Current", OrderFinances.class, p) if (!dbList.isEmpty()) { BulkRequestBuilder bulkRequest = esClient.getClient().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) dbList.each { info -> IndexRequestBuilder esRequest = esClient.getClient() .prepareIndex(supplierService.getDateYearESIndex(source.supplierCode, ESKeys.ES_DELIVER_BUSINESS_ORDER_INDEX, 1), ESKeys.ES_DELIVER_ORDER_FINANCES_TYPE) .setParent(String.valueOf(info.idOrder)) .setId(String.valueOf(info.id)) .setSource(jsonConvert.convertTo(info), XContentType.JSON) bulkRequest.add(esRequest) } BulkResponse bulkResponse = bulkRequest.execute().actionGet() if (bulkResponse.hasFailures()) { logger.error("索引订单OrderFinances:" + bulkResponse.buildFailureMessage()) } else { logger.info("索引订单共OrderFinances" + dbList.size() + "条记录") } } }) ExecPageFlipper.execute(500, { PageFlipper p -> List dbList = dbService. getList(dataSourceId, "select * from deOrderDiscount${supplierCode}_Current", OrderDiscount.class, p) if (!dbList.isEmpty()) { BulkRequestBuilder bulkRequest = esClient.getClient().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) dbList.each { info -> IndexRequestBuilder esRequest = esClient.getClient() .prepareIndex(supplierService.getDateYearESIndex(source.supplierCode, ESKeys.ES_DELIVER_BUSINESS_ORDER_INDEX, 1), ESKeys.ES_DELIVER_ORDER_DISCOUNT_TYPE) .setParent(String.valueOf(info.idOrder)) .setId(String.valueOf(info.id)) .setSource(jsonConvert.convertTo(info), XContentType.JSON) bulkRequest.add(esRequest) } BulkResponse bulkResponse = bulkRequest.execute().actionGet() if (bulkResponse.hasFailures()) { logger.error("索引订单OrderDiscount:" + bulkResponse.buildFailureMessage()) } else { logger.info("索引订单OrderDiscount共" + dbList.size() + "条记录") } } }) ExecPageFlipper.execute(500, { PageFlipper p -> List dbList = dbService. getList(dataSourceId, "select * from deOrderRiderItem${supplierCode}_Current", OrderRiderItem.class, p) if (!dbList.isEmpty()) { BulkRequestBuilder bulkRequest = esClient.getClient().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) dbList.each { info -> IndexRequestBuilder esRequest = esClient.getClient() .prepareIndex(supplierService.getDateYearESIndex(source.supplierCode, ESKeys.ES_DELIVER_BUSINESS_ORDER_INDEX, 1), ESKeys.ES_DELIVER_ORDER_RIDER_ITEM_TYPE) .setParent(String.valueOf(info.idOrder)) .setId(String.valueOf(info.id)) .setSource(jsonConvert.convertTo(info), XContentType.JSON) bulkRequest.add(esRequest) } BulkResponse bulkResponse = bulkRequest.execute().actionGet() if (bulkResponse.hasFailures()) { logger.error("索引订单OrderRiderItem:" + bulkResponse.buildFailureMessage()) } else { logger.info("索引订单共OrderRiderItem" + dbList.size() + "条记录") } } }) return RetResult.successT() } }