BE_BusinessOrder_SyncDataBase2ES.groovy 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219
  1. import com.yinjie.heating.common.api.BusinessExecutor
  2. import com.yinjie.heating.common.api.SupplierInitService
  3. import com.yinjie.heating.common.datas.ERPModule
  4. import com.yinjie.heating.common.datas.ESKeys
  5. import com.yinjie.heating.common.entity.base.ProcessStringItem
  6. import com.yinjie.heating.common.entity.order.*
  7. import com.dySweetFishPlugin.elasticsearch.ESClient
  8. import com.dySweetFishPlugin.sql.DBService
  9. import com.sweetfish.convert.json.JsonConvert
  10. import com.sweetfish.service.RetResult
  11. import com.sweetfish.source.PageFlipper
  12. import com.sweetfish.util.Utility
  13. import org.apache.logging.log4j.LogManager
  14. import org.apache.logging.log4j.Logger
  15. import org.elasticsearch.action.bulk.BulkRequestBuilder
  16. import org.elasticsearch.action.bulk.BulkResponse
  17. import org.elasticsearch.action.index.IndexRequestBuilder
  18. import org.elasticsearch.action.support.WriteRequest
  19. import org.elasticsearch.common.xcontent.XContentType
  20. import javax.annotation.Resource
  21. @SuppressWarnings(["unused"])
  22. class BE_BusinessOrder_SyncDataBase2ES implements BusinessExecutor<ProcessStringItem, ProcessStringItem> {
  23. protected final Logger logger = LogManager.getLogger(this.getClass().getSimpleName())
  24. @Resource
  25. SupplierInitService supplierService
  26. @Resource
  27. ESClient esClient
  28. @Resource
  29. JsonConvert jsonConvert
  30. @Resource
  31. DBService dbService
  32. @Override
  33. String scriptName() {
  34. return "外卖订单-ES同步数据库"
  35. }
  36. @Override
  37. ERPModule module() {
  38. return ERPModule.DATA_SYNC
  39. }
  40. @Override
  41. RetResult<ProcessStringItem> execute(ProcessStringItem source) {
  42. String dataSourceId = source.getDataSourceId()
  43. long supplierCode = source.getSupplierCode()
  44. esClient.checkIndexEx(supplierService.getDateYearESIndex(supplierCode, ESKeys.ES_DELIVER_BUSINESS_ORDER_INDEX, 0),
  45. ESKeys.INDEX_CONFIG,
  46. Utility.ofMap(ESKeys.ES_DELIVER_BUSINESS_ORDER_TYPE, "businessorder.json",
  47. ESKeys.ES_DELIVER_ORDER_DELIVERY_INFO_TYPE, "orderdeliveryinfo.json",
  48. ESKeys.ES_DELIVER_ORDER_FINANCES_TYPE, "orderfinances.json",
  49. ESKeys.ES_DELIVER_ORDER_DISCOUNT_TYPE, "orderdiscount.json",
  50. ESKeys.ES_DELIVER_ORDER_DETAIL_ITEM_TYPE, "orderdetailitem.json",
  51. ESKeys.ES_DELIVER_ORDER_AFTER_SALE_BILL_TYPE, "orderaftersalebill.json",
  52. ESKeys.ES_DELIVER_ORDER_RIDER_ITEM_TYPE, "orderrideritem.json"))
  53. ExecPageFlipper.execute(500, { PageFlipper p ->
  54. List<BusinessOrder> dbList = dbService.<BusinessOrder> getList(dataSourceId, "select * from deBusinessOrder${supplierCode}_Current", BusinessOrder.class, p)
  55. if (!dbList.isEmpty()) {
  56. BulkRequestBuilder bulkRequest = esClient.getClient().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
  57. dbList.each { order ->
  58. IndexRequestBuilder esRequest = esClient.getClient()
  59. .prepareIndex(supplierService.getDateYearESIndex(source.supplierCode, ESKeys.ES_DELIVER_BUSINESS_ORDER_INDEX, 1), ESKeys.ES_DELIVER_BUSINESS_ORDER_TYPE)
  60. .setId(String.valueOf(order.id))
  61. .setSource(source.getConvert("BusinessOrder").convertTo(order), XContentType.JSON)
  62. bulkRequest.add(esRequest)
  63. }
  64. BulkResponse bulkResponse = bulkRequest.execute().actionGet()
  65. if (bulkResponse.hasFailures()) {
  66. logger.error("索引订单:" + bulkResponse.buildFailureMessage())
  67. } else {
  68. logger.info("索引订单共" + dbList.size() + "条记录")
  69. }
  70. }
  71. })
  72. ExecPageFlipper.execute(500, { PageFlipper p ->
  73. List<OrderDetailItem> dbList = dbService.<OrderDetailItem> getList(dataSourceId, "select * from deOrderDetailItem${supplierCode}_Current", OrderDetailItem.class, p)
  74. if (!dbList.isEmpty()) {
  75. BulkRequestBuilder bulkRequest = esClient.getClient().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
  76. dbList.each { info ->
  77. IndexRequestBuilder esRequest = esClient.getClient()
  78. .prepareIndex(supplierService.getDateYearESIndex(source.supplierCode, ESKeys.ES_DELIVER_BUSINESS_ORDER_INDEX, 1), ESKeys.ES_DELIVER_ORDER_DETAIL_ITEM_TYPE)
  79. .setParent(String.valueOf(info.idOrder))
  80. .setId(String.valueOf(info.id))
  81. .setSource(jsonConvert.convertTo(info), XContentType.JSON)
  82. bulkRequest.add(esRequest)
  83. }
  84. BulkResponse bulkResponse = bulkRequest.execute().actionGet()
  85. if (bulkResponse.hasFailures()) {
  86. logger.error("索引订单detailItem:" + bulkResponse.buildFailureMessage())
  87. } else {
  88. logger.info("索引订单共detailItem" + dbList.size() + "条记录")
  89. }
  90. }
  91. })
  92. ExecPageFlipper.execute(500, { PageFlipper p ->
  93. List<OrderDeliveryInfo> dbList = dbService.<OrderDeliveryInfo> getList(dataSourceId, "select * from deOrderDeliveryInfo${supplierCode}_Current", OrderDeliveryInfo.class, p)
  94. if (!dbList.isEmpty()) {
  95. BulkRequestBuilder bulkRequest = esClient.getClient().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
  96. dbList.each { info ->
  97. IndexRequestBuilder esRequest = esClient.getClient()
  98. .prepareIndex(supplierService.getDateYearESIndex(source.supplierCode, ESKeys.ES_DELIVER_BUSINESS_ORDER_INDEX, 1), ESKeys.ES_DELIVER_ORDER_DELIVERY_INFO_TYPE)
  99. .setParent(String.valueOf(info.idOrder))
  100. .setId(String.valueOf(info.id))
  101. .setSource(jsonConvert.convertTo(info), XContentType.JSON)
  102. bulkRequest.add(esRequest)
  103. }
  104. BulkResponse bulkResponse = bulkRequest.execute().actionGet()
  105. if (bulkResponse.hasFailures()) {
  106. logger.error("索引订单DeliveryInfo:" + bulkResponse.buildFailureMessage())
  107. } else {
  108. logger.info("索引订单共DeliveryInfo" + dbList.size() + "条记录")
  109. }
  110. }
  111. })
  112. ExecPageFlipper.execute(500, { PageFlipper p ->
  113. List<OrderFinances> dbList = dbService.<OrderFinances> getList(dataSourceId, "select * from deOrderFinances${supplierCode}_Current", OrderFinances.class, p)
  114. if (!dbList.isEmpty()) {
  115. BulkRequestBuilder bulkRequest = esClient.getClient().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
  116. dbList.each { info ->
  117. IndexRequestBuilder esRequest = esClient.getClient()
  118. .prepareIndex(supplierService.getDateYearESIndex(source.supplierCode, ESKeys.ES_DELIVER_BUSINESS_ORDER_INDEX, 1), ESKeys.ES_DELIVER_ORDER_FINANCES_TYPE)
  119. .setParent(String.valueOf(info.idOrder))
  120. .setId(String.valueOf(info.id))
  121. .setSource(jsonConvert.convertTo(info), XContentType.JSON)
  122. bulkRequest.add(esRequest)
  123. }
  124. BulkResponse bulkResponse = bulkRequest.execute().actionGet()
  125. if (bulkResponse.hasFailures()) {
  126. logger.error("索引订单OrderFinances:" + bulkResponse.buildFailureMessage())
  127. } else {
  128. logger.info("索引订单共OrderFinances" + dbList.size() + "条记录")
  129. }
  130. }
  131. })
  132. ExecPageFlipper.execute(500, { PageFlipper p ->
  133. List<OrderDiscount> dbList = dbService.<OrderDiscount> getList(dataSourceId, "select * from deOrderDiscount${supplierCode}_Current", OrderDiscount.class, p)
  134. if (!dbList.isEmpty()) {
  135. BulkRequestBuilder bulkRequest = esClient.getClient().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
  136. dbList.each { info ->
  137. IndexRequestBuilder esRequest = esClient.getClient()
  138. .prepareIndex(supplierService.getDateYearESIndex(source.supplierCode, ESKeys.ES_DELIVER_BUSINESS_ORDER_INDEX, 1), ESKeys.ES_DELIVER_ORDER_DISCOUNT_TYPE)
  139. .setParent(String.valueOf(info.idOrder))
  140. .setId(String.valueOf(info.id))
  141. .setSource(jsonConvert.convertTo(info), XContentType.JSON)
  142. bulkRequest.add(esRequest)
  143. }
  144. BulkResponse bulkResponse = bulkRequest.execute().actionGet()
  145. if (bulkResponse.hasFailures()) {
  146. logger.error("索引订单OrderDiscount:" + bulkResponse.buildFailureMessage())
  147. } else {
  148. logger.info("索引订单OrderDiscount共" + dbList.size() + "条记录")
  149. }
  150. }
  151. })
  152. ExecPageFlipper.execute(500, { PageFlipper p ->
  153. List<OrderRiderItem> dbList = dbService.<OrderRiderItem> getList(dataSourceId, "select * from deOrderRiderItem${supplierCode}_Current", OrderRiderItem.class, p)
  154. if (!dbList.isEmpty()) {
  155. BulkRequestBuilder bulkRequest = esClient.getClient().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
  156. dbList.each { info ->
  157. IndexRequestBuilder esRequest = esClient.getClient()
  158. .prepareIndex(supplierService.getDateYearESIndex(source.supplierCode, ESKeys.ES_DELIVER_BUSINESS_ORDER_INDEX, 1), ESKeys.ES_DELIVER_ORDER_RIDER_ITEM_TYPE)
  159. .setParent(String.valueOf(info.idOrder))
  160. .setId(String.valueOf(info.id))
  161. .setSource(jsonConvert.convertTo(info), XContentType.JSON)
  162. bulkRequest.add(esRequest)
  163. }
  164. BulkResponse bulkResponse = bulkRequest.execute().actionGet()
  165. if (bulkResponse.hasFailures()) {
  166. logger.error("索引订单OrderRiderItem:" + bulkResponse.buildFailureMessage())
  167. } else {
  168. logger.info("索引订单共OrderRiderItem" + dbList.size() + "条记录")
  169. }
  170. }
  171. })
  172. return RetResult.successT()
  173. }
  174. }