一. 将数据入库
- 在Linux中将文件进行拆分:split -a 2 -l 1000000 -d test.csv file_ (百万切割csv文件)
- 将文件复制到本地进行解析
- 采用策略使用 java.sql.PreparedStatement 批处理数据(启用事务)
批处理:数据库处理速度极快,单次吞吐量很大,执行效率高,addBatch()将sql装载在一起,一次性送往数据库执行
// 构建连接
private Connection getConnection() throws Exception {
Class.forName(driver);
Connection conn = DriverManager.getConnection(url, user, pwd);
return conn;
}
@Test
public void importCsvFile() {
// 传入文件
String inputFile = "G:\\csvImport\\file_00.csv";
Map<Integer, Test> testInfoMap = new HashMap<>();
Connection con = null;
try {
con = getConnection();
String sql = "insert into test(id, name, info) VALUES (?, ?, ?)";
con.setAutoCommit(false); // 事务处理
PreparedStatement ptatm = con.prepareStatement(sql);
// 读取文件
BufferedInputStream bis = new BufferedInputStream(new FileInputStream(new File(inputFile)));
BufferedReader br = new BufferedReader(new InputStreamReader(bis, StandardCharsets.UTF_8), 10 * 1024 * 1024);
while (br.ready()) {
String line = in.readLine();
try {
String[] split = line.split(",");
Test test= new Test();
test.setId(split[0]); // 数据赋值按需求修改
// 这里使用map是为了防止主键冲突
testInfoMap.put(Integer.valueOf(arr[0]), test);
} catch (Exception e) {
System.out.println("error:" + e + ",line:" + line);
}
}
in.close();
int i = 0; // 用于控制条数
try {
//将内存读取数据,批量写入数据库
for (Map.Entry<Integer, Test> testEntry : testInfoMap.entrySet()) {
Test test= testEntry .getValue();
ptatm.setInt(1, test.getId());
ptatm.setString(2, test.getName().trim());
ptatm.setString(3, test.getInfo());
ptatm.addBatch(); //批量记录到容器里
if (i == 100000) { //当数据读取到10w条则把这部分数据先写入数据库
i = 0; //重置计数器
ptatm.executeBatch(); //执行批量SQL语句
ptatm.clearBatch(); //清除容器中已写入的数据,预备下次存入数据使用
}
i++;
}
if (i < 100000) { //清空剩余数据
ptatm.executeBatch();
ptatm.clearBatch();
}
ptatm.close();
con.commit();
} catch (Exception e) {
ptatm.close();
con.commit();
e.printStackTrace();
}
} catch (Exception e) {
e.printStackTrace();
}
}
二. 对数据进行过滤处理
由于数据量级较大,对数据进行信息填充需要查询sql以及接口交互,因此采用多线程的方式进行数据二次处理,在进行数据获取时可以通过并行流的方式处理
public void runAndParse() {
System.out.println(new Date());
// 起5个线程进行数据处理
// 控制线程结束
CountDownLatch latch = new CountDownLatch(5);
// 获取执行顺序(原子性)
AtomicInteger queryCnt = new AtomicInteger(0);
for (int i = 0; i < 5; i++) {
new Thread(() -> {
// 默认从1开始处理(page默认为1),当前线程也为1
int curCnt = queryCnt.addAndGet(1);
System.out.println("begin:" + curCnt);
Pager<Test> testPager = testService.queryList(null, curCnt , 10000);
List<Test> testList = testPager.getList();
// 进行数据解析
parseData(list);
System.out.println("done:" + curCnt);
// 线程处理结束标记
latch.countDown();
}).start();
}
// 控制所有线程结束时重新执行递归该方法,直至数据处理完毕(结束标记根据需求设定)
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(new Date());
System.out.println("线程全部执行结束");
runThread();
}
private void parseData(List<Test> list) {
// 使用并行流的方式进行数据获取处理
List<Test> collect = list.parallelStream().peek(this::judgeTest).collect(Collectors.toList());
Connection con = null;
try {
con = getConnection();
// 后续代码跟导入时类似,进行sql处理与批量更新操作
三. 速率对比分析
数据量 | 操作 | 单线程 | 多线程 | 并行流 | 耗时 |
---|---|---|---|---|---|
1w | 查询、更新 | 是 | 否 | 否 | 10min |
1w | 查询、批量更新 | 是 | 否 | 否 | 8min |
1w * 5 | 查询、批量更新 | 否 | 是 | 否 | 13min |
1w * 5 | 查询、批量更新 | 否 | 是 | 是 | 6min |