Schedule_1_60_300_SysMsgExpire.groovy 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960
  1. import com.yinjie.heating.common.api.NoSqlKeysService
  2. import com.yinjie.heating.common.datas.ESKeys
  3. import com.dySweetFishPlugin.elasticsearch.ESClient
  4. import org.apache.logging.log4j.Logger
  5. import org.elasticsearch.action.ActionListener
  6. import org.elasticsearch.index.query.BoolQueryBuilder
  7. import org.elasticsearch.index.query.QueryBuilders
  8. import org.elasticsearch.index.reindex.BulkByScrollResponse
  9. import org.elasticsearch.index.reindex.UpdateByQueryAction
  10. import org.elasticsearch.script.Script
  11. import org.elasticsearch.script.ScriptType
  12. //系统群发消息过期, 系统启动后1分钟后执行一遍标记过期操作,然后每隔5分钟执行一次过期操作
  13. static def createRunnable(ESClient esClient, NoSqlKeysService keysService, Logger logger, long supplierCode) {
  14. return {
  15. //只有系统群发消息才有过期设置,点对点发的无此功能
  16. long current = (new Date()).getTime()
  17. String index = keysService.getESKey(ESKeys.ESERPSYSMSG_INDEX, supplierCode)
  18. BoolQueryBuilder qb = QueryBuilders.boolQuery()
  19. qb = qb.must(QueryBuilders.rangeQuery("expireTime").gte(current))
  20. .must(QueryBuilders.termQuery("expireFlag", 0))
  21. .must(QueryBuilders.typeQuery(ESKeys.ESERPDEFAULT_TYPE))
  22. //批量更新
  23. UpdateByQueryAction.INSTANCE.newRequestBuilder(esClient.getClient())
  24. .refresh(true)
  25. .abortOnVersionConflict(false)
  26. .source(index)
  27. .script(new Script(
  28. ScriptType.INLINE,
  29. "painless",
  30. "ctx._source.expireFlag = 1",
  31. Collections.emptyMap()
  32. ))
  33. .filter(qb)
  34. .execute(new ActionListener<BulkByScrollResponse>() {
  35. //回调监听
  36. @Override
  37. public void onResponse(BulkByScrollResponse response) {
  38. if (response.getUpdated() > 0L) {
  39. logger.info("修改过期:" + response.getUpdated())
  40. }
  41. }
  42. @Override
  43. public void onFailure(Exception e) {
  44. // Handle the exception
  45. logger.error(e.getMessage(), e)
  46. }
  47. })
  48. } as Runnable
  49. }
  50. //groovy最后一个表达式的值为返回
  51. createRunnable(esClient, keysService, logger, supplierCode)