• 三狮军团首秀 只有两千多球迷观战 2019-05-19
  • 人民网2017呼和浩特徒步迎新活动--内蒙古频道--人民网 2019-05-19
  • 【品牌资讯】环球网斩获“全国行业新闻网站传播力2017年6月榜”多项冠军 2019-05-15
  • 深化对经济工作主线的认识 从供需关系看供给侧结构性改革 2019-05-15
  • 格拉斯哥艺术学院起火 4年前曾遭火灾仍在整修 2019-05-14
  • 回复@地瓜干17世:猪临死才会嚎叫呢~ 2019-05-14
  • 婺源古村溪中发现鹰嘴龟 2019-05-08
  • 编辑评测:高夫净源控油平衡露 极速补水长效控油 2019-05-08
  • 四部门发文规范特色小镇建设防止“新瓶装旧酒” 2019-05-02
  • 【地球的盛会文明的聚会艺术的盛宴四海一家足球为人类和平幸福而荣耀!!!普京是当今人类世界最优秀的一代伟人俄罗斯赢啦!!!】 2019-04-29
  • 学习新思想,千万师生同上一堂课 2019-04-28
  • 你这种个体户都干不了的老蚕也配谈计划?真是笑死人不偿命哦? 2019-04-23
  • 感人!的哥带着患病父亲出车 孝心感动乘客 2019-04-23
  • 图解:习近平在纪念马克思诞辰200周年大会上讲话的16个金句 2019-04-16
  • 感触名家笔下的端午文化 吃香粽原来可以这样"文艺" 2019-04-16
  • 快乐十分开奖结果查询:利用Redis实现延时处理的方法实例

    山西体彩11选5直选遗漏 www.caxru.com  更新时间:2019年03月10日 15:45:57   作者:我一定会有猫的   我要评论

    这篇文章主要给大家介绍了关于利用Redis实现延时处理的相关资料,文中通过示例代码介绍的非常详细,对大家的学习或者使用Redis具有一定的参考学习价值,需要的朋友们下面来一起学习学习吧

    背景

    在开发中,往往会遇到一些关于延时任务的需求。例如

    •生成订单30分钟未支付,则自动取消

    •生成订单60秒后,给用户发短信

    对上述的任务,我们给一个专业的名字来形容,那就是延时任务。

    最近需要做一个延时处理的功能,主要是从kafka中消费消息后根据消息中的某个延时字段来进行延时处理,在实际的实现过程中有一些需要注意的地方,记录如下。

    实现过程

    说到java中的定时功能,首先想到的Timer和ScheduledThreadPoolExecutor,但是相比之下Timer可以排除,主要原因有以下几点:

    • Timer使用的是绝对时间,系统时间的改变会对Timer产生一定的影响;而ScheduledThreadPoolExecutor使用的是相对时间,所以不会有这个问题。
    • Timer使用单线程来处理任务,长时间运行的任务会导致其他任务的延时处理,而ScheduledThreadPoolExecutor可以自定义线程数量。
    • Timer没有对运行时异常进行处理,一旦某个任务触发运行时异常,会导致整个Timer崩溃,而ScheduledThreadPoolExecutor对运行时异常做了捕获(可以在 afterExecute() 回调方法中进行处理),所以更加安全。

    1、ScheduledThreadPoolExecutor决定了用ScheduledThreadPoolExecutor来进行实现,接下来就是代码编写啦(大体流程代码)。

    主要的延时实现如下:

    ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(10, new NamedThreadFactory("scheduleThreadPool"), new 
    ThreadPoolExecutor.AbortPolicy());
    //从消息中取出延迟时间及相关信息的代码略
    int delayTime = 0;
    executorService.scheduleWithFixedDelay(new Runnable() {
      @Override
      public void run() {
       //具体操作逻辑
      }},0,delayTime, TimeUnit.SECONDS);

    其中NamedThreadFactory是我自定义的一个线程工厂,主要给线程池定义名称及相关日志打印便于后续的问题分析,这里就不多做介绍了。拒绝策略也是采用默认的拒绝策略。

    然后测试了一下,满足目标需求的功能,可以做到延迟指定时间后执行,至此似乎功能就被完成了。

    大家可能疑问,这也太简单了有什么好说的,但是这种方式实现简单是简单但是存在一个潜在的问题,问题在哪呢,让我们看一下ScheduledThreadPoolExecutor的源码:

    public ScheduledThreadPoolExecutor(int corePoolSize,ThreadFactory threadFactory) {
     super(corePoolSize, Integer.MAX_VALUE, 0, 
     TimeUnit.NANOSECONDS,new DelayedWorkQueue(), threadFactory);}

    ScheduledThreadPoolExecutor由于它自身的延时和周期的特性,默认使用了DelayWorkQueue,而并不像我们平时使用的SingleThreadExecutor等构造是可以使用自己定义的LinkedBlockingQueue并且设置队列大小,问题就出在这里。

    DelayWrokQueue是一个无界队列,而我们的目标数据源是kafka,也就是一个高并发高吞吐的消息队列,很大可能在某一时间段有大量的消息过来从而导致OOM,在使用多线程时我们是肯定要考虑到OOM的可能性的,因为OOM带来的后果往往比较严重,系统OOM临时的解决办法一般只能是重启,可能会导致用户数据丢失等不可能挽回的问题,所以从编码设计阶段要采用尽可能稳妥的手段来避免这些问题。

    2、采用redis和线程结合

    这一次换了思路,采用redis来帮助我们做缓冲,从而避免消息过多OOM的问题。

    相关redis zset api:

    //添加元素
    ZADD key score member [[score member] [score member] …]
    //根据分值及限制数量查询
    ZRANGEBYSCORE key min max [WITHSCORES] [LIMIT offset count]
    //从zset中删除指定成员
    ZREM key member [member …]

    我们采用redis基础数据结构的zset结构,采用score来存储我们目标发送时间的数值,整体处理流程如下:

    • 第一步数据存储:9:10分从kafka接收了一条a的订单消息,要求30分钟后进行发货通知,那我们就将当前时间加上30分钟然后转为时间戳作为a的score,key为a的订单号存入redis中。代码如下:
    public void onMessage(String topic, String message) {
      String orderId;
    		int delayTime = 0;
      try {
       Map<String, String> msgMap = gson.fromJson(message, new TypeToken<Map<String, String>>() {
       }.getType());
       if (msgMap.isEmpty()) {
        return;
       }
       LOGGER.info("onMessage kafka content:{}", msgMap.toString());
    	 orderId = msgMap.get("orderId");
       if(StringUtils.isNotEmpty(orderId)){
        delayTime = Integer.parseInt(msgMap.get("delayTime"));
        Calendar calendar = Calendar.getInstance();
        //计算出预计发送时间
        calendar.add(Calendar.MINUTE, delayTime);
        long sendTime = calendar.getTimeInMillis();
        RedisUtils.getInstance().zetAdd(Constant.DELAY, sendTime, orderId);
        LOGGER.info("orderId:{}---放入redis中等待发送---sendTime:{}", ---orderId:{}, sendTime);
       }
      } catch (Exception e) {
       LOGGER.info("onMessage 延时发送异常:{}", e);
      }
     }
    • 第二步数据处理:另起一个线程具体调度时间根据业务需求来定,我这里3分钟执行一次,内部逻辑:从redis中取出一定量的zset数据,如何取呢,使用zset的zrangeByScore方法,根据数据的score进行排序,当然可以带上时间段,这里从0到现在,来进行消费,需要注意的一点是,在取出数据后我们需要用zrem方法将取出的数据从zset中删除,防止其他线程重复消费数据。在此之后进行接下来的发货通知等相关逻辑。代码如下:
    public void run(){
      //获取批量大小
      int orderNum = Integer.parseInt(PropertyUtil.get(Constant.ORDER_NUM,"100"));
      try {
       //批量获取离发送时间最近的orderNum条数据
    	 Calendar calendar = Calendar.getInstance();
    	 long now = calendar.getTimeInMillis();
    	 //获取无限早到现在的事件key(防止上次批量数量小于放入数量,存在历史数据未消费情况)
    	 Set<String> orderIds = RedisUtils.getInstance().zrangeByScore(Constant.DELAY, 0, now, 0, orderNum);
    	 LOGGER.info("task.getOrderFromRedis---size:{}---orderIds:{}", orderIds.size(), gson.toJson(orderIds));
       if (CollectionUtils.isNotEmpty(orders)){
        //删除key 防止重复发送
        for (String orderId : orderIds) {
         RedisUtils.getInstance().zrem(Constant.DELAY, orderId);
        }
    	  //接下来执行发送等业务逻辑     
       }
      } catch (Exception e) {
       LOGGER.warn("task.run exception:{}", e);
      }
     }

    至此完成了依赖redis和线程完成了延时发送的功能。

    结语

    那么对上面两种不同的实现方式进行一下优缺点比较:

    • 第一种方式实现简单,不依赖外部组件,能够快速的实现目标功能,但缺点也很明显,需要在特定的场景下使用,如果是我这种消息量大的情况下使用很可能是有问题,当然在数据源消息不多的情况下不失为好的选择。
    • 第二种方式实现稍微复杂一点,但是能够适应消息量大的场景,采用redis的zset作为了“中间件”的效果,并且帮助我们进行延时的功能实现能够较好的适应高并发场景,缺点在于在编写的过程中需要考虑实际的因素较多,例如线程的执行周期时间,发送可能会有一定时间的延迟,批量数据大小的设置等等。

    综上是本人这次延时功能的实现过程的两种实现方式的总结,具体采用哪种方式还需大家根据实际情况选择,希望能给大家带来帮助。ps:由于本人的技术能力有限,文章中可能出现技术描述不准确或者错误的情况恳请各位大佬指出,我立马进行改正,避免误导大家,谢谢!

    总结

    以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,谢谢大家对脚本之家的支持。

    相关文章

    • 使用java的注解(用在java类的方法上的注解)方法

      使用java的注解(用在java类的方法上的注解)方法

      这篇文章主要介绍了使用java的注解(用在java类的方法上的注解)方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
      2019-03-03
    • mybatis简介与配置_动力节点Java学院整理

      mybatis简介与配置_动力节点Java学院整理

      这篇文章主要介绍了mybatis简介与配置,介绍了MyBatis+Spring+MySql简单配置,有兴趣的可以了解一下
      2017-09-09
    • java微信公众号企业付款开发

      java微信公众号企业付款开发

      这篇文章主要为大家详细介绍了java微信公众号企业付款开发,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
      2018-09-09
    • Spring Cloud原理详解

      Spring Cloud原理详解

      今天小编就为大家分享一篇关于Spring Cloud原理详解,小编觉得内容挺不错的,现在分享给大家,具有很好的参考价值,需要的朋友一起跟随小编来看看吧
      2019-02-02
    • Spring MVC 拦截器实现代码

      Spring MVC 拦截器实现代码

      本篇文章主要介绍了Spring MVC 拦截器的相关资料,具有一定的参考价值,感兴趣的小伙伴们可以参考一下。
      2017-02-02
    • SpringBoot中的内容协商器图解

      SpringBoot中的内容协商器图解

      本文通过图文解说加代码的形式给大家介绍了SpringBoot中的内容协商器知识,需要的朋友参考下吧
      2017-11-11
    • 使用Post方式提交数据到Tomcat服务器的方法

      使用Post方式提交数据到Tomcat服务器的方法

      这篇将介绍使用Post方式提交数据到服务器,由于Post的方式和Get方式创建Web工程是一模一样的,只用几个地方的代码不同,这篇文章主要介绍了使用Post方式提交数据到Tomcat服务器的方法,感兴趣的朋友一起学习吧
      2016-04-04
    • Java中对象的序列化详解及实例

      Java中对象的序列化详解及实例

      这篇文章主要介绍了 Java中对象的序列化详解及实例的相关资料,需要的朋友可以参考下
      2017-04-04
    • 使用HttpClient调用接口的实例讲解

      使用HttpClient调用接口的实例讲解

      下面小编就为大家带来一篇使用HttpClient调用接口的实例讲解。小编觉得挺不错的,现在就分享给大家,也给大家做个参考。一起跟随小编过来看看吧
      2017-10-10
    • java发起http请求获取返回的Json对象方法

      java发起http请求获取返回的Json对象方法

      下面小编就为大家分享一篇java发起http请求获取返回的Json对象方法,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
      2018-03-03

    最新评论

  • 三狮军团首秀 只有两千多球迷观战 2019-05-19
  • 人民网2017呼和浩特徒步迎新活动--内蒙古频道--人民网 2019-05-19
  • 【品牌资讯】环球网斩获“全国行业新闻网站传播力2017年6月榜”多项冠军 2019-05-15
  • 深化对经济工作主线的认识 从供需关系看供给侧结构性改革 2019-05-15
  • 格拉斯哥艺术学院起火 4年前曾遭火灾仍在整修 2019-05-14
  • 回复@地瓜干17世:猪临死才会嚎叫呢~ 2019-05-14
  • 婺源古村溪中发现鹰嘴龟 2019-05-08
  • 编辑评测:高夫净源控油平衡露 极速补水长效控油 2019-05-08
  • 四部门发文规范特色小镇建设防止“新瓶装旧酒” 2019-05-02
  • 【地球的盛会文明的聚会艺术的盛宴四海一家足球为人类和平幸福而荣耀!!!普京是当今人类世界最优秀的一代伟人俄罗斯赢啦!!!】 2019-04-29
  • 学习新思想,千万师生同上一堂课 2019-04-28
  • 你这种个体户都干不了的老蚕也配谈计划?真是笑死人不偿命哦? 2019-04-23
  • 感人!的哥带着患病父亲出车 孝心感动乘客 2019-04-23
  • 图解:习近平在纪念马克思诞辰200周年大会上讲话的16个金句 2019-04-16
  • 感触名家笔下的端午文化 吃香粽原来可以这样"文艺" 2019-04-16
  • 福彩3d预测 北京赛车pk10改单 七星彩开奖公告 福彩3d图谜总汇 重庆时时彩 排列5摇奖 衡阳市福彩中心电话 江西多乐彩开奖走势图 极速时时彩计划网 福利彩票3d试机号 北京pk10高手技术分享 七乐彩几几年发行 福彩七乐彩走势图-综合版 15选5开奖 时时彩缩水app 江西新时时彩预测