深入Sqlite多线程入库的问题
程序员文章站
2023-12-17 14:02:16
今天经理给了我一个三十多m的sql文件,让我测试数据定位的问题。按照惯例,我使用navicat for sqlite创建一个表,然后将sql文件导入。我然后去干其他事儿了,...
今天经理给了我一个三十多m的sql文件,让我测试数据定位的问题。按照惯例,我使用navicat for sqlite创建一个表,然后将sql文件导入。我然后去干其他事儿了,大约过了一个多小时,我想数据应该导入的差不多了吧。我打开一看,汗,死在那儿了。我关掉软件又重新导入一遍,还是那个德行。又得知经理曾经自己也导过,没有成功。看来,用工具导入的方法行不通了。
但是,想想就十多万条数据,就是十多万条insert sql语句,有那么难吗?于是,我想还是自己写一个程序导入吧。虽然中间也遇到一些小插曲,但是还是成功地把数据导进去了。
程序的代码如下:
package com.geoway.pad.common.tool;
import java.io.bufferedreader;
import java.io.file;
import java.io.fileinputstream;
import java.io.filenotfoundexception;
import java.io.ioexception;
import java.io.inputstream;
import java.io.inputstreamreader;
import java.sql.connection;
import java.sql.drivermanager;
import java.sql.sqlexception;
import java.sql.statement;
import java.util.concurrent.executorservice;
import java.util.concurrent.executors;
import java.util.concurrent.timeunit;
/**
* @author likehua
* @note sqlite建库以及批量入库
* */
public class batchtool{
//ddl
private static string ddl="create table if not exists pbeijing_point (objectid integer,name text,address text,phone text,fax text,type text,citycode text,url text,email text,name2 text,x integer,y integer)";
connection jcon=null;
//get connection
public synchronized connection getconnection(){
if(jcon==null){
// json=
statement state=null;
try {
class.forname("org.sqlite.jdbc");
jcon=drivermanager.getconnection("jdbc:sqlite:c:\\newd.db");
state=jcon.createstatement();
state.executeupdate(ddl);
} catch (sqlexception e) {
e.printstacktrace();
} catch (classnotfoundexception e) {
e.printstacktrace();
}
}
return jcon;
}
//创建500个线程
executorservice service=executors.newfixedthreadpool(500);
//读取sql文件 每五百个insert 语句由一个线程批量操作
public void readbatchsql(inputstream is) throws ioexception{
bufferedreader bufferreader=new bufferedreader(new inputstreamreader(is,"utf-8"));
string line;
string one="";
int tag=0;
string batchsql="";
while((line=bufferreader.readline())!=null){
one+=line;
if(one.indexof(";")!=-1){
batchsql+=one;
one="";//reset
tag++;
};
//符合条件 开辟一个线程
if(tag!=0&&tag/500!=0){
service.execute(new sqlitebatchhandler(batchsql));
batchsql="";//reset
tag=0;//reset
}
}
//最后执行 剩余的sql
if(batchsql.length()>0){
system.out.println("finalsql:"+batchsql);
runnable r=new sqlitebatchhandler(batchsql);
service.execute(r);
};
try {
//关闭线程池
this.service.shutdown();
this.service.awaittermination(1, timeunit.hours);<br> getconnection().close();<br> } catch (interruptedexception e) {
e.printstacktrace();
} catch (sqlexception e) {
e.printstacktrace();
}
};
/**
* @note 分割sql
* */
private static string[] splitsql(string batchsql){
if(batchsql!=null){
return batchsql.split(";");
};
return null;
}
/**
* @note 执行批量更新操作
* 由于connection.comit 操作时 如果存在 statement没有close 就会报错 因此将此方法加上同步 。
* */
private synchronized void exucteupdate(string batch){
statement ste=null;
connection con=null;
try{
con=getconnection();
con.setautocommit(false);
ste=con.createstatement();
string[] sqls=this.splitsql(batch);
for(string sql:sqls){
if(sql!=null){
ste.addbatch(sql);
};
};
ste.executebatch();<br> ste.close();
con.commit();//提交
}catch(exception e){
e.printstacktrace();
system.out.println("执行失败:"+batch);
try {
con.rollback();//回滚
} catch (sqlexception e1) {
e1.printstacktrace();
}
}finally{
if(ste!=null){
try {
ste.close();
} catch (sqlexception e) {
e.printstacktrace();
}
}
}
}
/**
* @author likehua
* @note 入库线程
* */
private class sqlitebatchhandler implements runnable{
private string batch;
public sqlitebatchhandler(string sql){
this.batch=sql;
};
@suppresswarnings("static-access")
@override
public void run() {
try {
thread.currentthread().sleep(50);
} catch (interruptedexception e) {
e.printstacktrace();
}
if(this.batch.length()>0){
exucteupdate(batch);
};
}
}
/**
* @author likehua
* @note 主函数入口
* */
public static void main(string[] args) throws filenotfoundexception, ioexception{
batchtool s=new batchtool();
s.readbatchsql(new fileinputstream(new file("c:\\poi.sql")));
}
}
但是,想想就十多万条数据,就是十多万条insert sql语句,有那么难吗?于是,我想还是自己写一个程序导入吧。虽然中间也遇到一些小插曲,但是还是成功地把数据导进去了。
程序的代码如下:
复制代码 代码如下:
package com.geoway.pad.common.tool;
import java.io.bufferedreader;
import java.io.file;
import java.io.fileinputstream;
import java.io.filenotfoundexception;
import java.io.ioexception;
import java.io.inputstream;
import java.io.inputstreamreader;
import java.sql.connection;
import java.sql.drivermanager;
import java.sql.sqlexception;
import java.sql.statement;
import java.util.concurrent.executorservice;
import java.util.concurrent.executors;
import java.util.concurrent.timeunit;
/**
* @author likehua
* @note sqlite建库以及批量入库
* */
public class batchtool{
//ddl
private static string ddl="create table if not exists pbeijing_point (objectid integer,name text,address text,phone text,fax text,type text,citycode text,url text,email text,name2 text,x integer,y integer)";
connection jcon=null;
//get connection
public synchronized connection getconnection(){
if(jcon==null){
// json=
statement state=null;
try {
class.forname("org.sqlite.jdbc");
jcon=drivermanager.getconnection("jdbc:sqlite:c:\\newd.db");
state=jcon.createstatement();
state.executeupdate(ddl);
} catch (sqlexception e) {
e.printstacktrace();
} catch (classnotfoundexception e) {
e.printstacktrace();
}
}
return jcon;
}
//创建500个线程
executorservice service=executors.newfixedthreadpool(500);
//读取sql文件 每五百个insert 语句由一个线程批量操作
public void readbatchsql(inputstream is) throws ioexception{
bufferedreader bufferreader=new bufferedreader(new inputstreamreader(is,"utf-8"));
string line;
string one="";
int tag=0;
string batchsql="";
while((line=bufferreader.readline())!=null){
one+=line;
if(one.indexof(";")!=-1){
batchsql+=one;
one="";//reset
tag++;
};
//符合条件 开辟一个线程
if(tag!=0&&tag/500!=0){
service.execute(new sqlitebatchhandler(batchsql));
batchsql="";//reset
tag=0;//reset
}
}
//最后执行 剩余的sql
if(batchsql.length()>0){
system.out.println("finalsql:"+batchsql);
runnable r=new sqlitebatchhandler(batchsql);
service.execute(r);
};
try {
//关闭线程池
this.service.shutdown();
this.service.awaittermination(1, timeunit.hours);<br> getconnection().close();<br> } catch (interruptedexception e) {
e.printstacktrace();
} catch (sqlexception e) {
e.printstacktrace();
}
};
/**
* @note 分割sql
* */
private static string[] splitsql(string batchsql){
if(batchsql!=null){
return batchsql.split(";");
};
return null;
}
/**
* @note 执行批量更新操作
* 由于connection.comit 操作时 如果存在 statement没有close 就会报错 因此将此方法加上同步 。
* */
private synchronized void exucteupdate(string batch){
statement ste=null;
connection con=null;
try{
con=getconnection();
con.setautocommit(false);
ste=con.createstatement();
string[] sqls=this.splitsql(batch);
for(string sql:sqls){
if(sql!=null){
ste.addbatch(sql);
};
};
ste.executebatch();<br> ste.close();
con.commit();//提交
}catch(exception e){
e.printstacktrace();
system.out.println("执行失败:"+batch);
try {
con.rollback();//回滚
} catch (sqlexception e1) {
e1.printstacktrace();
}
}finally{
if(ste!=null){
try {
ste.close();
} catch (sqlexception e) {
e.printstacktrace();
}
}
}
}
/**
* @author likehua
* @note 入库线程
* */
private class sqlitebatchhandler implements runnable{
private string batch;
public sqlitebatchhandler(string sql){
this.batch=sql;
};
@suppresswarnings("static-access")
@override
public void run() {
try {
thread.currentthread().sleep(50);
} catch (interruptedexception e) {
e.printstacktrace();
}
if(this.batch.length()>0){
exucteupdate(batch);
};
}
}
/**
* @author likehua
* @note 主函数入口
* */
public static void main(string[] args) throws filenotfoundexception, ioexception{
batchtool s=new batchtool();
s.readbatchsql(new fileinputstream(new file("c:\\poi.sql")));
}
}