| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219 |
- import com.yinjie.heating.common.api.BusinessExecutor
- import com.yinjie.heating.common.api.SupplierInitService
- import com.yinjie.heating.common.datas.ERPModule
- import com.yinjie.heating.common.datas.ESKeys
- import com.yinjie.heating.common.entity.base.ProcessStringItem
- import com.yinjie.heating.common.entity.order.*
- import com.dySweetFishPlugin.elasticsearch.ESClient
- import com.dySweetFishPlugin.sql.DBService
- import com.sweetfish.convert.json.JsonConvert
- import com.sweetfish.service.RetResult
- import com.sweetfish.source.PageFlipper
- import com.sweetfish.util.Utility
- import org.apache.logging.log4j.LogManager
- import org.apache.logging.log4j.Logger
- import org.elasticsearch.action.bulk.BulkRequestBuilder
- import org.elasticsearch.action.bulk.BulkResponse
- import org.elasticsearch.action.index.IndexRequestBuilder
- import org.elasticsearch.action.support.WriteRequest
- import org.elasticsearch.common.xcontent.XContentType
- import javax.annotation.Resource
- @SuppressWarnings(["unused"])
- class BE_BusinessOrder_SyncDataBase2ES implements BusinessExecutor<ProcessStringItem, ProcessStringItem> {
- protected final Logger logger = LogManager.getLogger(this.getClass().getSimpleName())
- @Resource
- SupplierInitService supplierService
- @Resource
- ESClient esClient
- @Resource
- JsonConvert jsonConvert
- @Resource
- DBService dbService
- @Override
- String scriptName() {
- return "外卖订单-ES同步数据库"
- }
- @Override
- ERPModule module() {
- return ERPModule.DATA_SYNC
- }
- @Override
- RetResult<ProcessStringItem> execute(ProcessStringItem source) {
- String dataSourceId = source.getDataSourceId()
- long supplierCode = source.getSupplierCode()
- esClient.checkIndexEx(supplierService.getDateYearESIndex(supplierCode, ESKeys.ES_DELIVER_BUSINESS_ORDER_INDEX, 0),
- ESKeys.INDEX_CONFIG,
- Utility.ofMap(ESKeys.ES_DELIVER_BUSINESS_ORDER_TYPE, "businessorder.json",
- ESKeys.ES_DELIVER_ORDER_DELIVERY_INFO_TYPE, "orderdeliveryinfo.json",
- ESKeys.ES_DELIVER_ORDER_FINANCES_TYPE, "orderfinances.json",
- ESKeys.ES_DELIVER_ORDER_DISCOUNT_TYPE, "orderdiscount.json",
- ESKeys.ES_DELIVER_ORDER_DETAIL_ITEM_TYPE, "orderdetailitem.json",
- ESKeys.ES_DELIVER_ORDER_AFTER_SALE_BILL_TYPE, "orderaftersalebill.json",
- ESKeys.ES_DELIVER_ORDER_RIDER_ITEM_TYPE, "orderrideritem.json"))
- ExecPageFlipper.execute(500, { PageFlipper p ->
- List<BusinessOrder> dbList = dbService.<BusinessOrder> getList(dataSourceId, "select * from deBusinessOrder${supplierCode}_Current", BusinessOrder.class, p)
- if (!dbList.isEmpty()) {
- BulkRequestBuilder bulkRequest = esClient.getClient().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
- dbList.each { order ->
- IndexRequestBuilder esRequest = esClient.getClient()
- .prepareIndex(supplierService.getDateYearESIndex(source.supplierCode, ESKeys.ES_DELIVER_BUSINESS_ORDER_INDEX, 1), ESKeys.ES_DELIVER_BUSINESS_ORDER_TYPE)
- .setId(String.valueOf(order.id))
- .setSource(source.getConvert("BusinessOrder").convertTo(order), XContentType.JSON)
- bulkRequest.add(esRequest)
- }
- BulkResponse bulkResponse = bulkRequest.execute().actionGet()
- if (bulkResponse.hasFailures()) {
- logger.error("索引订单:" + bulkResponse.buildFailureMessage())
- } else {
- logger.info("索引订单共" + dbList.size() + "条记录")
- }
- }
- })
- ExecPageFlipper.execute(500, { PageFlipper p ->
- List<OrderDetailItem> dbList = dbService.<OrderDetailItem> getList(dataSourceId, "select * from deOrderDetailItem${supplierCode}_Current", OrderDetailItem.class, p)
- if (!dbList.isEmpty()) {
- BulkRequestBuilder bulkRequest = esClient.getClient().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
- dbList.each { info ->
- IndexRequestBuilder esRequest = esClient.getClient()
- .prepareIndex(supplierService.getDateYearESIndex(source.supplierCode, ESKeys.ES_DELIVER_BUSINESS_ORDER_INDEX, 1), ESKeys.ES_DELIVER_ORDER_DETAIL_ITEM_TYPE)
- .setParent(String.valueOf(info.idOrder))
- .setId(String.valueOf(info.id))
- .setSource(jsonConvert.convertTo(info), XContentType.JSON)
- bulkRequest.add(esRequest)
- }
- BulkResponse bulkResponse = bulkRequest.execute().actionGet()
- if (bulkResponse.hasFailures()) {
- logger.error("索引订单detailItem:" + bulkResponse.buildFailureMessage())
- } else {
- logger.info("索引订单共detailItem" + dbList.size() + "条记录")
- }
- }
- })
- ExecPageFlipper.execute(500, { PageFlipper p ->
- List<OrderDeliveryInfo> dbList = dbService.<OrderDeliveryInfo> getList(dataSourceId, "select * from deOrderDeliveryInfo${supplierCode}_Current", OrderDeliveryInfo.class, p)
- if (!dbList.isEmpty()) {
- BulkRequestBuilder bulkRequest = esClient.getClient().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
- dbList.each { info ->
- IndexRequestBuilder esRequest = esClient.getClient()
- .prepareIndex(supplierService.getDateYearESIndex(source.supplierCode, ESKeys.ES_DELIVER_BUSINESS_ORDER_INDEX, 1), ESKeys.ES_DELIVER_ORDER_DELIVERY_INFO_TYPE)
- .setParent(String.valueOf(info.idOrder))
- .setId(String.valueOf(info.id))
- .setSource(jsonConvert.convertTo(info), XContentType.JSON)
- bulkRequest.add(esRequest)
- }
- BulkResponse bulkResponse = bulkRequest.execute().actionGet()
- if (bulkResponse.hasFailures()) {
- logger.error("索引订单DeliveryInfo:" + bulkResponse.buildFailureMessage())
- } else {
- logger.info("索引订单共DeliveryInfo" + dbList.size() + "条记录")
- }
- }
- })
- ExecPageFlipper.execute(500, { PageFlipper p ->
- List<OrderFinances> dbList = dbService.<OrderFinances> getList(dataSourceId, "select * from deOrderFinances${supplierCode}_Current", OrderFinances.class, p)
- if (!dbList.isEmpty()) {
- BulkRequestBuilder bulkRequest = esClient.getClient().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
- dbList.each { info ->
- IndexRequestBuilder esRequest = esClient.getClient()
- .prepareIndex(supplierService.getDateYearESIndex(source.supplierCode, ESKeys.ES_DELIVER_BUSINESS_ORDER_INDEX, 1), ESKeys.ES_DELIVER_ORDER_FINANCES_TYPE)
- .setParent(String.valueOf(info.idOrder))
- .setId(String.valueOf(info.id))
- .setSource(jsonConvert.convertTo(info), XContentType.JSON)
- bulkRequest.add(esRequest)
- }
- BulkResponse bulkResponse = bulkRequest.execute().actionGet()
- if (bulkResponse.hasFailures()) {
- logger.error("索引订单OrderFinances:" + bulkResponse.buildFailureMessage())
- } else {
- logger.info("索引订单共OrderFinances" + dbList.size() + "条记录")
- }
- }
- })
- ExecPageFlipper.execute(500, { PageFlipper p ->
- List<OrderDiscount> dbList = dbService.<OrderDiscount> getList(dataSourceId, "select * from deOrderDiscount${supplierCode}_Current", OrderDiscount.class, p)
- if (!dbList.isEmpty()) {
- BulkRequestBuilder bulkRequest = esClient.getClient().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
- dbList.each { info ->
- IndexRequestBuilder esRequest = esClient.getClient()
- .prepareIndex(supplierService.getDateYearESIndex(source.supplierCode, ESKeys.ES_DELIVER_BUSINESS_ORDER_INDEX, 1), ESKeys.ES_DELIVER_ORDER_DISCOUNT_TYPE)
- .setParent(String.valueOf(info.idOrder))
- .setId(String.valueOf(info.id))
- .setSource(jsonConvert.convertTo(info), XContentType.JSON)
- bulkRequest.add(esRequest)
- }
- BulkResponse bulkResponse = bulkRequest.execute().actionGet()
- if (bulkResponse.hasFailures()) {
- logger.error("索引订单OrderDiscount:" + bulkResponse.buildFailureMessage())
- } else {
- logger.info("索引订单OrderDiscount共" + dbList.size() + "条记录")
- }
- }
- })
- ExecPageFlipper.execute(500, { PageFlipper p ->
- List<OrderRiderItem> dbList = dbService.<OrderRiderItem> getList(dataSourceId, "select * from deOrderRiderItem${supplierCode}_Current", OrderRiderItem.class, p)
- if (!dbList.isEmpty()) {
- BulkRequestBuilder bulkRequest = esClient.getClient().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
- dbList.each { info ->
- IndexRequestBuilder esRequest = esClient.getClient()
- .prepareIndex(supplierService.getDateYearESIndex(source.supplierCode, ESKeys.ES_DELIVER_BUSINESS_ORDER_INDEX, 1), ESKeys.ES_DELIVER_ORDER_RIDER_ITEM_TYPE)
- .setParent(String.valueOf(info.idOrder))
- .setId(String.valueOf(info.id))
- .setSource(jsonConvert.convertTo(info), XContentType.JSON)
- bulkRequest.add(esRequest)
- }
- BulkResponse bulkResponse = bulkRequest.execute().actionGet()
- if (bulkResponse.hasFailures()) {
- logger.error("索引订单OrderRiderItem:" + bulkResponse.buildFailureMessage())
- } else {
- logger.info("索引订单共OrderRiderItem" + dbList.size() + "条记录")
- }
- }
- })
- return RetResult.successT()
- }
- }
|