处理文件大小为5G+的GPS文本文件,约5千万行左右的数据,包括ID, 经纬度,时间戳,是否载有乘客等信息。如果直接用记事本,notepad++等文件编辑器打开,会导致死机。可以使用vim查看文件信息。
python中提供了三种读取数据的方法:.read(), .readline().readlines()
.read() 返回整个文件,并放到一个字符串变量当中,如果直接读取大文件,会导致死机。
.readline() 每次返回一行的数据
.readlines() 返回整个文件,并按行返回list

根据实际需要,调用.read(size)指定大小读取文件信息最合适。避免.readline()逐行读取耗费时间。按块读取之后数据存放进数据库。在这里使用的是MySQL,因为commit进数据库非常耗时,所以可以每5000行或更多进行一次commit。

下面是整个读取和写入数据库的代码实例:

连接数据库

1
2
3
4
5
6
7
8
9
10
11
import MySQLdb
import time

start = time.time()
DB_HOST = 'localhost'
DB_USER = 'username'
DB_PASSWD = 'passwd'

# connect to MySQL
db = MySQLdb.connect(host = DB_HOST, user = DB_USER, passwd = DB_PASSWD)
cur = db.cursor()

读取文件行数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#file path
path = 'E:/Slides/LBS/TaxiData/20100621.txt'

#count total line number of file
f = open(path, 'r')

def Lines():
cline = 0
while 1:
buffer = f.read(8*1024*1024)
if not buffer:
break
cline += buffer.count("\n")
print cline

f.seek(0)
return cline

在数据库中创建table

1
2
3
4
5
6
7
8
9
#sql create table syntax
sql_create_table = """CREATE TABLE 0401
(taxi_id VARCHAR(10), passenger VARCHAR(2), time_h tinyint, time_m tinyint, time_s tinyint, lon double, lat double)"""

# cur.execute('create database if not exists taxi')
db.select_db('taxi')

# cur.execute("drop table 0401_ave")
cur.execute(sql_create_table)

读取数据并commit进数据库

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
count = 1
countLine = 0

file_line = Lines()

#read lines in file and write insert statements
#insert several lines at one time
class FoundException(Exception): pass

try:
while(1):
sql_insert = """insert into 0401 (taxi_id , passenger , time_h, time_m, time_s, lon, lat) values """
while(count < 5000):
str_ori = f.readline()
#if read to the penultimate line line, change the stop control
if (countLine * 5000 + count) == (file_line - 1):
str_sp = str_ori.split(',')
sql_insert += " ('{0}',{1},{2},{3},{4},{5},{6}),".format(str_sp[0],str_sp[1],str_sp[2],str_sp[3],str_sp[4],str_sp[5],str_sp[6])
commit2db(f, sql_insert)
raise FoundException

str_sp = str_ori.split(',')
sql_insert +=" ('{0}',{1},{2},{3},{4},{5},{6}),".format(str_sp[0],str_sp[1],str_sp[2],str_sp[3],str_sp[4],str_sp[5],str_sp[6])
count = count +1
commit2db(f, sql_insert)
countLine += 1
print "load {0} lines".format(countLine * 5000)
count = 1

except FoundException:
print "the end of file"

#read one line and complete insert sentence
def commit2db(f, sql_insert):
#insert last row, sql append without ","
str_ori = f.readline()
str_sp = str_ori.split(',')
sql_insert += " ('{0}',{1},{2},{3},{4},{5},{6})".format(str_sp[0],str_sp[1],str_sp[2],str_sp[3],str_sp[4],str_sp[5],str_sp[6])
cur.execute(sql_insert)

# print "committed to db"
db.commit()

断开数据库连接

1
2
3
4
5
6
7
cur.close()
db.close()
f.close()

# print computing time
end = time.time()
print "total duration: {0} seconds".format(end - start)

跳到指定的行

如果数据库在运行中间断开连接,需要在终端的位置继续commit,那么就跳到特定的行,继续读入数据进数据库。

1
2
3
4
5
6
7
def jump2line(line):
lineC = 0
while ( lineC < line):
# load lines from a specific line, using when database over loaded for one time
f.readline()
lineC += 1
return f

reference

[1] Python读取大文件(GB)