SSE(Server Sent Event)服务端推送
SSE
严格地说,HTTP 协议无法做到服务器主动推送信息。但是,有一种变通方法,就是服务器向客户端声明,接下来要发送的是流信息(streaming)
也就是说,发送的不是一次性的数据包,而是一个数据流,会连续不断地发送过来。这时,客户端不会关闭连接,会一直等着服务器发过来的新的数据流,视频播放就是这样的例子。本质上,这种通信就是**以流信息的方式**,完成一次用时很长的下载。
SSE 就是利用这种机制,使用流信息向浏览器推送信息。它基于 HTTP 协议,目前除了 IE/Edge,其他浏览器都支持
功能需求:
之前有个项目初期上线,需要给几家试点单位上线试用,然后他们就不停的提问题,我们就不停的改,所以导致项目需要频繁更新
因为是试用,当时只部署了一个服务.因此组长就说干脆页面弄个公告功能,每次更新前给再用的用户提示弹窗,几点更新,心里有个数
方案
当时我并不清楚怎么搞,就问了下朋友,他给我推荐的是WebSocket,百度了一下感觉好像可行,但就是觉得有点太复杂了,就一个小功能.于是乎,后面又通过大佬了解到了一个比较冷门的服务端推送技术,sse.我当时就觉得是它了
和WebSocket的区别
详细的区别就看头顶的推荐文章吧,我这里觉得比较重要的就是两点
1,SSE 属于轻量级,单向通道 (服务端向客户端推送消息), 而WebSocket协议较为复杂,功能更强大是双向通道 (服务端和客户端互相都可以推送消息. 举例:在线客服)
2,IE、Edge不支持SSE.(当然,时代在更新,现在不知道是否支持,下面有测试是否支持说明)
实现
这里既然明确了自己要的东西,就开始使用吧,这里分为两部分,前端部分和后端部分;
大概思路:前端定义好一个sse对象实例,用作事件触发(这个事件需要定义在框架最外层,这样不管用户在哪个子页面都能收到提示).同时它具备自动检测断开并重连.后端用quartz定时任务加上sse,在指定的时间去触发sse,进行消息推送
直接上代码:
前端部分:
//后端接口,进行初始化
var url = "/xxx/initEvent";
var source = new EventSource(url);
$(function () {
// 连接时触发事件
// source.onopen=function (ev) {
// layer.alert("onopen");
// };
// 后端调用send时触发
source.onmessage = function (ev) {
layer.alert(ev.data);
//隔五分钟再次提醒
setTimeout(function () {
layer.alert(ev.data);
}, 5 1000 60);
};
//轮询检测连接状态,失败时触发
source.onerror = function (ev) {
console.log("连接失败:" + ev);
//这里直接关闭重连,根据业务决定
source.close();
};
})
后端部分:
这里需要注意:每一个sse对象是对应的一个连接,也就是用户,所有这里初始化的时候,是根据用户ip作为key,sse作为value存储起来,发送的时候,就遍历调用sse实例的send方法进行消息推送.
这里如果用不到其他的东西,其实就可以直接调用sseEmitter.send(),直接进行消息推送就行了,是不是超级简单
@RequestMapping("/initEvent")
public SseEmitter initEvent(HttpServletRequest request) {
//获取ip
String ipAddress = IPUtil.getIpAddress(request);
//如果设置超时(单位:毫秒),定期重新连接以获取新对象,
// 每次就需要返回新的对象建立连接,
// 设置超时时间
//SseEmitter sseEmitter = new SseEmitter(30 60 1000L);
SseEmitter sseEmitter = new SseEmitter(0L);//此处设置不超时,如果用户关闭页面前端设置直接断开连接,避免一直重复连接耗性能
log.info("=======用户ip:{}=======",ipAddress);
//定义一个公用的map方便调用,需要存储(这里可以用redis),因为向前端推送信息时需要获取当前对象
ToolUtil.sseCach.put(ipAddress, sseEmitter);
//记录用户在线时间
//ToolUtil.timeMap.put(ipAddress,new Date());
//用户超时的话,过滤掉关闭页面的用户连接,默认设置40分钟,需要比超时时间长,才能判断用户是否真的已经失效
//ToolUtil.timeOut();
return sseEmitter;
}
以下是结合定时任务使用,定时任务提供了比较全面的功能,除了创建和删除,还有暂停,唤醒,修改等
实体
@Date
public class JobEntity implements Serializable {
private String jobId; //唯一id
private String className; //定时任务示例的 class路径
private String cronExpression; //cron表达式
private String jobName; //定时任务名称
private String jobGroup; //所属组
private String triggerName; //触发器名称
private String triggerGroup; //触发器组
private String description; //备注
private String ggnr; //备注
private String data; //携带参数
/**
* 预留的数据库字段 如果任务信息选择手动自己存入数据库的话,会使用到
*/
private Boolean pauseStatus; //是否暂停
private Boolean deleteStatus; //是否有效
private Date createTime; //创建时间
@DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss")
private Date startTime; //开始时间
}
相关定时配置
@Component
public class JobFactory extends AdaptableJobFactory {
@Autowired
private AutowireCapableBeanFactory beanFactory;
@Override
protected Object createJobInstance(TriggerFiredBundle bundle)
throws Exception {
Object jobInstance = super.createJobInstance(bundle);
//Job实例注入到Job工厂
beanFactory.autowireBean(jobInstance);
return jobInstance;
}
}
@Configuration
public class QuartzConfig {
@Autowired
private JobFactory jobFactory;
/**
* 调度类FactoryBean
* @return
* @throws IOException
*/
@Bean("schedulerFactory")
public SchedulerFactoryBean schedulerFactoryBean() throws IOException {
SchedulerFactoryBean schedulerFactoryBean = new SchedulerFactoryBean();
//设置调度类quartz属性
schedulerFactoryBean.setQuartzProperties(quartzProperties());
//设置jobFactory
schedulerFactoryBean.setJobFactory(jobFactory);
return schedulerFactoryBean;
}
/**
* 解析quartz.properties文件,填充属性
* @return
* @throws IOException
*/
@Bean
public Properties quartzProperties() throws IOException{
PropertiesFactoryBean propertiesFactoryBean = new PropertiesFactoryBean();
//若不做额外配置,会有默认的配置文件加载 在jar org.quartz里面 有一份quartz.properties
//propertiesFactoryBean.setLocation(new ClassPathResource("/quartz.properties"));
propertiesFactoryBean.afterPropertiesSet();
return propertiesFactoryBean.getObject();
}
/**
* quartz初始化监听器
* @return
*/
@Bean
public QuartzInitializerListener initializerListener(){
return new QuartzInitializerListener();
}
/**
* 根据调度类工厂bean获取调度
* @return
* @throws IOException
*/
@Bean("scheduler")
public Scheduler scheduler() throws IOException{
return schedulerFactoryBean().getScheduler();
}
}
Controller
@RequestMapping("/add")
@ResponseBody
public String addJob(JobEntity job, HttpServletRequest request) {//此处可以优化定时任务的时间,执行多个或者时间段内循环
//时间不能是在当前时间之前,否则不允许创建
if (!job.getStartTime().before(new Date())) {
//赋值初始信息
JobEntity initJob = ToolUtil.init(job);
//创建定时任务
Boolean result = quartzService.addJob(initJob, request);
if (!result) {
return "创建失败";
}
//存入数据库
int i = quartzDbService.insertQuartz(initJob);
return "ok";
}
return "开始时间不能再当前时间之前";
}
@RequestMapping("/delete")
@ResponseBody
public String deleteJob(JobEntity job) {
//删除程序中的定时任务
Boolean result = quartzService.deleteJob(job);//e3bf33dc-8fd7-40f0-8b20-910848b2c783noticeJobName
if (!result) {
return "删除定时任务失败";
}
//删除库里面的记录
int i = quartzDbService.delQuartz(job.getJobId());
return "删除定时任务成功:" + job.getJobId() + ":" + job.getJobName();
}
@RequestMapping("/update")
@ResponseBody
public String updateJob(JobEntity job) {
if (!job.getStartTime().before(new Date())) {
Boolean result = quartzService.updateJob(job);
if (!result) {
return "页面已失效,请刷新";
}
job.setCreateTime(new Date());
int i = quartzDbService.editQuartz(job);
return "ok";
}
return "开始时间不能再当前时间之前";
}
@RequestMapping("/run")
@ResponseBody
public String runJob(@RequestBody JobEntity job) {
Boolean result = quartzService.runJob(job);
if (!result) {
return "启动定时任务失败";
}
return "启动定时任务成功:" + job.getJobId() + job.getJobName();
}
@RequestMapping("/pause")
@ResponseBody
public String pauseJob(@RequestBody JobEntity job) {
Boolean result = quartzService.pauseJob(job);
if (!result) {
return "暂停定时任务失败";
}
return "暂停定时任务成功:" + job.getJobId() + job.getJobName();
}
@RequestMapping("/resume")
@ResponseBody
public String resumeJob(@RequestBody JobEntity job) {
Boolean result = quartzService.resumeJob(job);
if (!result) {
return "唤醒定时任务失败";
}
return "唤醒定时任务成功:" + job.getJobId() + job.getJobName();
}
@RequestMapping("/query")
@ResponseBody
public String queryJob(@RequestBody JobEntity job) {
JSONObject result = quartzService.queryJob(job);
if (null == result) {
return "不存在对应的任务:" + job.getJobId() + job.getJobName();
}
return result.toString();
}
Server
public interface QuartzService {
/**
* 创建Job
* @param job
*/
Boolean addJob(JobEntity job, HttpServletRequest request );
/**
* 执行Job
* @param job
*/
Boolean runJob(JobEntity job);
/**
* 修改Job
* @param job
*/
Boolean updateJob(JobEntity job);
/**
* 暂定Job
* @param job
*/
Boolean pauseJob(JobEntity job);
/**
* 唤醒Job
* @param job
*/
Boolean resumeJob(JobEntity job);
/**
* 删除Job
* @param job
*/
Boolean deleteJob(JobEntity job);
/**
* 获取Job
* @param job
*/
JSONObject queryJob(JobEntity job);
}
@Service
public class QuartzServiceImpl implements QuartzService {
private static Logger log = LoggerFactory.getLogger(QuartzServiceImpl.class);
@Autowired
@Qualifier("scheduler")
private Scheduler scheduler;
@Override
public Boolean addJob(JobEntity job, HttpServletRequest request) {
try {
// JSONObject data = job.getData();
log.info("当前任务携带的业务参数={}", job.getData());
JobDataMap jobDataMap = new JobDataMap();
jobDataMap.put("myValue", job.getData());
//jobDataMap.put("request", request);
String jobId = job.getJobId();
String jobName = job.getJobName();
String jobUnique = jobId + jobName;
JobDetail jobDetail = JobBuilder
.newJob((Class<? extends Job>) Class.forName(job.getClassName()))
// 指定执行类
.withIdentity(jobUnique, job.getJobGroup())
// 指定name和group
.requestRecovery().withDescription(job.getDescription())
.setJobData(jobDataMap)
.build();
// 创建表达式调度构建器
CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder
.cronSchedule(job.getCronExpression());
// 创建触发器
CronTrigger cronTrigger = TriggerBuilder.newTrigger()
.withIdentity(job.getTriggerName(), job.getTriggerGroup())
.withDescription(job.getDescription())
.withSchedule(cronScheduleBuilder).build();
scheduler.scheduleJob(jobDetail, cronTrigger);
scheduler.start();
log.info("定时任务[{}]创建成功,开始执行", jobId + jobName);
return true;
} catch (Exception e) {
e.printStackTrace();
}
return false;
}
@Override
public Boolean runJob(JobEntity job) {
try {
String jobId = job.getJobId();
String jobName = job.getJobName();
String jobUnique = jobId + jobName;
scheduler.triggerJob(JobKey.jobKey(jobUnique,
job.getJobGroup()));
log.info("定时任务[{}]执行成功", jobUnique);
return true;
} catch (Exception e) {
e.printStackTrace();
}
return false;
}
@Override
public Boolean updateJob(JobEntity job) {
try {
String jobId = job.getJobId();
String jobName = job.getJobName();
String jobUnique = jobId + jobName;
TriggerKey triggerKey = new TriggerKey(job.getTriggerName(),
job.getTriggerGroup());
CronTrigger cronTrigger = (CronTrigger) scheduler
.getTrigger(triggerKey);
CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder
.cronSchedule(job.getCronExpression());
// 重新构件表达式
CronTrigger trigger = cronTrigger.getTriggerBuilder()
.withIdentity(triggerKey).withSchedule(cronScheduleBuilder)
.withDescription(job.getDescription())
.build();
scheduler.rescheduleJob(triggerKey, trigger);
log.info("定时任务[{}]更新成功", jobUnique);
return true;
} catch (Exception e) {
e.printStackTrace();
}
return false;
}
@Override
public Boolean deleteJob(JobEntity job) {
try {
String jobId = job.getJobId();
String jobName = job.getJobName();
String jobUnique = jobId + jobName;
scheduler.deleteJob(JobKey.jobKey(jobUnique, job.getJobGroup()));
log.info("定时任务[{}]删除成功", jobUnique);
return true;
} catch (Exception e) {
e.printStackTrace();
}
return false;
}
@Override
public Boolean pauseJob(JobEntity job) {
try {
String jobId = job.getJobId();
String jobName = job.getJobName();
String jobUnique = jobId + jobName;
scheduler.pauseJob(JobKey.jobKey(jobUnique,
job.getJobGroup()));
log.info("定时任务[{}]暂停成功", jobUnique);
return true;
} catch (Exception e) {
e.printStackTrace();
}
return false;
}
@Override
public Boolean resumeJob(JobEntity job) {
try {
String jobId = job.getJobId();
String jobName = job.getJobName();
String jobUnique = jobId + jobName;
scheduler.resumeJob(JobKey.jobKey(jobUnique,
job.getJobGroup()));
log.info("定时任务[{}]唤醒成功", jobUnique);
return true;
} catch (Exception e) {
e.printStackTrace();
}
return false;
}
@Override
public JSONObject queryJob(JobEntity job) {
TriggerKey triggerKey = new TriggerKey(job.getTriggerName(),
job.getTriggerGroup());
try {
CronTrigger cronTrigger = (CronTrigger) scheduler.getTrigger(triggerKey);
if (null == cronTrigger) {
return null;
}
JSONObject jsonObject = new JSONObject();
jsonObject.put("expression", cronTrigger.getCronExpression());
jsonObject.put("state", scheduler.getTriggerState(triggerKey));
jsonObject.put("description", cronTrigger.getDescription());
return jsonObject;
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
}
工具类
public class ToolUtil {
public static Map<String, SseEmitter> sseCach = new ConcurrentHashMap<>();
public static Map<String, Date> timeMap = new HashMap();
static Logger logger = LoggerFactory.getLogger(ToolUtil.class);
//工具
public static JobEntity init(JobEntity job) {
//取时间拼接cron表达式
Calendar calendar = Calendar.getInstance();
calendar.setTime(job.getStartTime());
int year = calendar.get(Calendar.YEAR);
int month = calendar.get(Calendar.MONTH) + 1;
int day = calendar.get(Calendar.DATE);
int hours = calendar.get(Calendar.HOUR_OF_DAY);
int minutes = calendar.get(Calendar.MINUTE);
int seconds = calendar.get(Calendar.SECOND);
//定时任务时间转换
StringBuffer cron = new StringBuffer();
cron.append(seconds).append(" ")
.append(minutes).append(" ")
.append(hours).append(" ")
.append(day).append(" ")
.append(month).append(" ")
.append("?").append(" ")
.append(year).append("-").append(year);
String uuid = UUID.randomUUID().toString();
job.setCreateTime(new Date());
job.setJobId(uuid);
job.setClassName("com.xxx.MyJobFirst");
job.setJobName("noticeJobName");
job.setJobGroup("noticeJobGroup");
//触发器需要是唯一,否则只能走修改
job.setTriggerName("noticeTriggerName:" + UUID.randomUUID().toString());
job.setTriggerGroup("noticeTriggerGroup:" + UUID.randomUUID().toString());
job.setDescription("xx");
job.setCronExpression(cron.toString());
job.setStartTime(job.getStartTime());
return job;
}
public static void timeOut() {
Iterator<Map.Entry<String, Date>> iterator = timeMap.entrySet().iterator();
Calendar calendarOld = Calendar.getInstance();
Calendar calendarNow = Calendar.getInstance();
List<String> list = new ArrayList();
while (iterator.hasNext()) {
Map.Entry<String, Date> next = iterator.next();
String key = next.getKey();//ip
Date value = next.getValue();//时间
calendarOld.setTime(value);
calendarNow.setTime(new Date());
int dayOld = calendarOld.get(Calendar.DATE);
int dayNow = calendarNow.get(Calendar.DATE);
int hoursOld = calendarOld.get(Calendar.HOUR_OF_DAY);
int hoursNow = calendarNow.get(Calendar.HOUR_OF_DAY);
int minutesOld = calendarOld.get(Calendar.MINUTE);
int minutesNow = calendarNow.get(Calendar.MINUTE);
if (dayNow - dayOld > 0) {
list.add(key);
} else {
hoursOld = hoursOld * 60 + minutesOld;
hoursNow = hoursNow * 60 + minutesNow;
//设置多少时间移除
if ((hoursNow - hoursOld) > 40) {
list.add(key);
}
}
}
for (int i = 0; i < list.size(); i++) {
sseCach.remove(list.get(i));
timeMap.remove(list.get(i));
logger.info("===========移除过期用户:{}",list.get(i));
}
}
}
在调用对应的定时接口后,就可以等待程序定时任务的执行即可,会调用定时job
@Component
public class MyJobFirst implements Job {
private static Logger log = LoggerFactory.getLogger(MyJobFirst.class);
private void before() {
log.info("******MyJobFirst任务开始执行******");
}
@Override
public void execute(JobExecutionContext context) {
before();
//定时任务处理的业务逻辑
String name = context.getJobDetail().getKey().getName();
log.info("******MyJobFirst任务[{}]正在执行******", name);
JobDataMap jobDataMap = context.getJobDetail().getJobDataMap();
String jsonObject = jobDataMap.get("myValue") == null ? "" : jobDataMap.get("myValue").toString();
//SseEmitter sseCache = ToolUtil.sseCach.get(ipAddress);
log.info("MyJobFirst任务[{}]携带的参数[{}]", name, jsonObject);
//拿到所有在线的用户,循环推送公告,如果用户关掉页面,会推送异常,处理一下即可,不影响其他用户的推送
Iterator<Map.Entry<String, SseEmitter>> iterator = ToolUtil.sseCach.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, SseEmitter> next = iterator.next();
SseEmitter value = next.getValue();
//用户ip
String key = next.getKey();
try {
value.send(jsonObject);
log.info("已发送公告用户:{},当前时间:{}", key, new Date());
} catch (Exception e) {
//e.printStackTrace();
log.info("未发送公告用户:{},当前时间:{}", key, new Date());
}
}
String time = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
log.info("当前时间[{}],MyJobFirst任务[{}]的线程名[{}]", time, name, Thread.currentThread().getName());
after();
}
private void after() {
//执行完毕后清除一下
//ToolUtil.sseCach.clear();
log.info("******MyJobFirst任务执行结束******");
}
}
这里其实用的比较简单,还有很多可以优化的地方和功能,比如用户断开即可自动清理,可以实时监测在线时长,人数等