MQ消息处理感悟
项目需要获取MQ消息,之后用xml工具进行解析,再把解析后的数据插入数据库。本来几天做完的项目,期间遇到了各种坑,拖了一个多星期才预发布。 遇到的第一个坑就是收不到MQ消息,后来又只能能收到队列中积压的消息,之后的消息都收不到,这两个都是MQ配置的原因,这里不再详解。 收到消息后需要做的第一件事就是处理xml格式的消息流,最开始用的是jdom来做,由于消息中解析出来的元素较多,最终的效果就是满屏的set函数,代码比较丑陋。在组长的建议下使用了JAXB,最终效果就是下面这个代码片段。
import java.io.File;
import javax.xml.bind.JAXBContext;
import javax.xml.bind.JAXBException;
import javax.xml.bind.Unmarshaller;
//Unmarshaller
public class Xml2ObjectDemo {
public static void main(String[] args) {
try {
File file = new File("C:\\file.xml");
JAXBContext jaxbContext = JAXBContext.newInstance(Customer.class);
Unmarshaller jaxbUnmarshaller = jaxbContext.createUnmarshaller();
Customer customer = (Customer) jaxbUnmarshaller.unmarshal(file);
System.out.println(customer);
} catch (JAXBException e) {
e.printStackTrace();
}
}
}
处理好消息以后,一度以为代码已经完成,接着预发布。结果就是掉入了最大的坑,日志显示就是数据入库异常,在代码中为了让入库的速度更快,采用的是批量入库,就是每次消息处理都是累计到500条记录以后才一起插入数据库,所以使用了一个静态成员来存储之前处理的数据,一直累计,入库后清空。代码片如下:
class MessageHandler{ static Set<UserInfo> userInfoSet=new HashSet<UserInfo>(); /*消息处理函数*/ void dealMessage(String msg){ UserInfo userInfo=XmlParse(msg); userInfoSet.add(userInfo); if(userInfoSet.size()>=200){ dbService.insertUserInfo(userInfoSet); userInfoSet.clear(); } } }
日志异常显示很多set集合入库两次,两次数据完全一样,组长一度以为是因为数据库插入错误。很多次调试之后才发现MQ消息处理是多线程的,即会生成多个MessageHandler实例来处理消息,这样就导致userInfoSet成为了一个多个线程共享的全局变量,在无任何锁的情况下被并发访问。于是这样一种情况成为可能:当userInfoSet的容量达到199时,可能有两个线程同时到达dbService.insertInfo(userInfoSet)
,于是在clear之前数据被入库两次。找出问题之后提出了初版解决方案:
class MessageHandler{ /*ConcurrentSkipListSet是一个线程安全set*/ static Set<UserInfo> userInfoSet=new ConcurrentSkipListSet<UserInfo>(); /*消息处理函数*/ void dealMessage(String msg){ UserInfo userInfo=XmlParse(msg); userInfoSet.add(userInfo); synchronized(userInfoSet){ if(userInfoSet.size()>=200){ dbService.insertUserInfo(userInfoSet); userInfoSet.clear(); } } } }
看起来似乎没有问题了,set中添加元素和求set的大小以及clear等是线程安全的了。然而,这些操作并不处于同一个同步块中,于是这样一种情况成为可能,一个线程刚刚执行完userInfoSet.add(userInfo)
,另一个线程刚好执行userInfoSet.clear()
,于是前面的数据被抹掉了。最终的代码片如下:
class MessageHandler{ /*不需要线程安全set*/ static Set<UserInfo> userInfoSet=new HashSet<UserInfo>(); /*消息处理函数*/ void dealMessage(String msg){ UserInfo userInfo=XmlParse(msg); Set<UserInfo> tmpSet=new HashSet<UserInfo>(); synchronized(userInfoSet){ userInfoSet.add(userInfo); if(userInfoSet.size()>=200){ tmpSet.addAll(userInfoSet); userInfoSet.clear(); } } //将费时的数据库插入移到同步块外面 dbService.insertUserInfo(tmpSet); } }
经过这么周折,感悟就是对多线程对全局变量的操作一定要都在一个锁里面,最好将所有的访问集中在一起加锁。当然,更好的办法是使用同步队列,采用生产者消费者模式。