| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960 |
- import com.yinjie.heating.common.api.NoSqlKeysService
- import com.yinjie.heating.common.datas.ESKeys
- import com.dySweetFishPlugin.elasticsearch.ESClient
- import org.apache.logging.log4j.Logger
- import org.elasticsearch.action.ActionListener
- import org.elasticsearch.index.query.BoolQueryBuilder
- import org.elasticsearch.index.query.QueryBuilders
- import org.elasticsearch.index.reindex.BulkByScrollResponse
- import org.elasticsearch.index.reindex.UpdateByQueryAction
- import org.elasticsearch.script.Script
- import org.elasticsearch.script.ScriptType
- //系统群发消息过期, 系统启动后1分钟后执行一遍标记过期操作,然后每隔5分钟执行一次过期操作
- static def createRunnable(ESClient esClient, NoSqlKeysService keysService, Logger logger, long supplierCode) {
- return {
- //只有系统群发消息才有过期设置,点对点发的无此功能
- long current = (new Date()).getTime()
- String index = keysService.getESKey(ESKeys.ESERPSYSMSG_INDEX, supplierCode)
- BoolQueryBuilder qb = QueryBuilders.boolQuery()
- qb = qb.must(QueryBuilders.rangeQuery("expireTime").gte(current))
- .must(QueryBuilders.termQuery("expireFlag", 0))
- .must(QueryBuilders.typeQuery(ESKeys.ESERPDEFAULT_TYPE))
- //批量更新
- UpdateByQueryAction.INSTANCE.newRequestBuilder(esClient.getClient())
- .refresh(true)
- .abortOnVersionConflict(false)
- .source(index)
- .script(new Script(
- ScriptType.INLINE,
- "painless",
- "ctx._source.expireFlag = 1",
- Collections.emptyMap()
- ))
- .filter(qb)
- .execute(new ActionListener<BulkByScrollResponse>() {
- //回调监听
- @Override
- public void onResponse(BulkByScrollResponse response) {
- if (response.getUpdated() > 0L) {
- logger.info("修改过期:" + response.getUpdated())
- }
- }
- @Override
- public void onFailure(Exception e) {
- // Handle the exception
- logger.error(e.getMessage(), e)
- }
- })
- } as Runnable
- }
- //groovy最后一个表达式的值为返回
- createRunnable(esClient, keysService, logger, supplierCode)
|