다운로드
작성자: admin 작성일시: 2017-05-24 14:12:47 조회수: 6350 다운로드: 259
카테고리: Python 태그목록:

Dask 사용법 기초

Pandas 는 데이터베이스나 CSV 파일의 데이터를 모두 메모리로 읽어들인 다음 메모리 위에서 데이터를 처리한다. 하지만 데이터의 양이 많은 경우에는 메모리의 제한으로 데이터프레임을 만들 수 없는 경우가 있다. 또한 데이터프레임의 크기가 너무 크면 질의나 그룹 연산을 할 때 하나의 CPU 코어로 처리하기에는 시간이 너무 많이 걸릴 수도 있다.

이러한 경우에 도움이 되는 것이 Dask 패키지이다. Dask 패키지는 Pandas 데이터프레임 형식으로 빅데이터를 처리하기 위한 파이썬 패키지로 다음과 같은 두 가지 기능을 가진다.

  1. 가상 데이터프레임
  2. 병렬처리용 작업 스케줄러

가상 데이터프레임

Dask 패키지를 사용하면 가상 데이터프레임을 만들 수 있다. 가상 데이터프레임은 Pandas 데이터프레임과 비슷한 기능을 제공하지만 실제로 모든 데이터가 메모리 상에 로드되어 있는 것이 아니라 하나 이상의 파일 혹은 데이터베이스에 존재하는 채로 처리할 수 있는 기능이다. 따라서 메모리 크기와 관계 없이 엄청나게 큰 CSV 파일을 가상 데이터프레임으로 로드하거나 같은 형식의 데이터를 가진 여러개의 CSV 파일을 하나의 가상 데이터프레임에 로드할 수 있다.

실제로 Dask의 가상 데이터프레임을 어떻게 쓸 수 있는지 실습을 통해 알아보자. 실습을 위해 홈 디렉토리 아래에 data라는 임시 디렉토리를 만들고 그곳으로 옮겨 CSV 파일을 하나 만들어 보자.

In [1]:
%%writefile data1.csv
time,temperature,humidity
0,22,58
1,21,57
2,25,57
3,26,55
4,22,53
5,23,59
Writing data1.csv

Dask 패키지의 dataframe 서브패키지를 dd라는 이름으로 임포트한다.

In [2]:
import dask.dataframe as dd

read_csv 명령으로 데이터 파일 data1.csv에 대한 가상 데이터프레임 df를 만든다.

In [3]:
df = dd.read_csv("data1.csv")
df
Out:
Dask DataFrame Structure:
time temperature humidity
npartitions=1
int64 int64 int64
... ... ...
Dask Name: from-delayed, 3 tasks

df는 데이터프레임과 유사하지만 실제로 데이터를 메모리에 읽지 않았기 때문에 값은 표시되지 않는다.

head, tail 명령을 내리면 그 때서야 일부 데이터를 읽어서 표시한다.

In [4]:
df.head()
Out:
time temperature humidity
0 0 22 58
1 1 21 57
2 2 25 57
3 3 26 55
4 4 22 53

temperature 열의 평균을 구해보자.

In [5]:
df.temperature.mean()
Out:
dd.Scalar

데이터프레임과 달리 바로 결과가 나오지 않는다. 그 이유는 연산 반환값이 결과가 아닌 작업(task)이기 때문이다. 구체적으로 어떤 작업인지를 보려면 visualize 메서드를 사용하여 작업 그래프(graph)를 볼 수 있다. 작업 그래프란 이 계산을 하기 위해 실제로 CPU가 해야 할 일들의 순서도라고 볼 수 있다.

이 작업의 값을 실제로 구하려면 결과로 받은 작업 객체의 compute 메서드를 호출해야 한다.

In [6]:
df.temperature.mean().compute()
Out:
23.166666666666668

이번에는 이 값을 화씨로 변환해 보자.

In [7]:
(df.temperature * 9 / 5 + 32).compute()
Out:
0    71.6
1    69.8
2    77.0
3    78.8
4    71.6
5    73.4
Name: temperature, dtype: float64

이번에는 이 값으로 원래의 temperature 열을 갱신해보자. 이 때는 Pandas의 문법을 쓰지 못하고 다음과 같이 assign 메서드를 사용해야 한다. assign 메서드를 사용할 때는 compute를 할 필요가 없다.

In [8]:
df = df.assign(temperature=df.temperature * 9 / 5 + 32)
df.head()
Out:
time temperature humidity
0 0 71.6 58
1 1 69.8 57
2 2 77.0 57
3 3 78.8 55
4 4 71.6 53

자료형을 변환하거나 새로운 열을 추가하는 것도 가능하다.

In [9]:
df = df.assign(title=df.temperature.astype(str) + " degree")
df.head()
Out:
time temperature humidity title
0 0 71.6 58 71.6 degree
1 1 69.8 57 69.8 degree
2 2 77.0 57 77.0 degree
3 3 78.8 55 78.8 degree
4 4 71.6 53 71.6 degree

복수 데이터에 대한 가상 데이터프레임

Dask의 가상 데이터프레임이므로 원천 데이터 파일을 하나가 아닌 복수로 설정할 수도 있다. 예를 들어 앞서 보았던 data1.csv 파일 이외에도 다음과 같이 data2.csv', 'data3.csv 파일이 있을 경우, 이 파일을 한 번에 하나의 데이터프레임으로 읽어들일 수도 있다.

In [10]:
%%writefile data2.csv
time,temperature,humidity
0,22,58
1,21,57
2,25,57
3,26,55
4,22,53
5,23,59
Writing data2.csv
In [11]:
%%writefile data3.csv
time,temperature,humidity
0,22,58
1,21,57
2,25,57
3,26,55
4,22,53
5,23,59
Writing data3.csv

복수 파일은 와일드카드(*) 기호를 이용하여 읽는다.

In [12]:
df = dd.read_csv('data*.csv')
df.count().compute()
Out:
time           18
temperature    18
humidity       18
dtype: int64
In [13]:
df.temperature.describe().compute()
Out:
count    18.000000
mean     23.166667
std       1.823055
min      21.000000
25%      22.000000
50%      22.500000
75%      24.500000
max      26.000000
dtype: float64

대량 데이터의 병렬 처리

이번에는 Dask로 대량의 데이터를 처리해보자. 샘플로 쓸 데이터는 미국 정부가 발표하는 공개 정보 중 하나로 시카고의 범죄 관련 데이터이다.

다음 명령으로 이 데이터를 다운로드 받을 수 있다. CSV 파일의 크기가 1.3GB가 넘으므로 다운로드에 10분 이상이 걸릴 수도 있다.

!wget -O crime.csv https://data.cityofchicago.org/api/views/ijzp-q8t2/rows.csv?accessType=DOWNLOAD

파일을 다운로드 받은 후에는 가상 데이터프레임으로 읽어들인다. 구체적인 데이터를 아직 모르기 때문에 우선 문자열 자료형으로 읽어들인다. 또 error_bad_lines 옵션을 False로 해서 오류가 나는 데이터는 생략하도록 한다.

In [14]:
df = dd.read_csv("crime.csv", dtype=str, error_bad_lines=False, warn_bad_lines=False)
df
Out:
Dask DataFrame Structure:
ID Case Number Date Block IUCR Primary Type Description Location Description Arrest Domestic Beat District Ward Community Area FBI Code X Coordinate Y Coordinate Year Updated On Latitude Longitude Location
npartitions=25
object object object object object object object object object object object object object object object object object object object object object object
... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...
... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...
... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...
... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...
Dask Name: from-delayed, 75 tasks
In [15]:
df.tail()
Out:
ID Case Number Date Block IUCR Primary Type Description Location Description Arrest Domestic ... Ward Community Area FBI Code X Coordinate Y Coordinate Year Updated On Latitude Longitude Location
249058 11536936 JB555516 11/22/2018 12:00:00 AM 011XX S MAYFIELD AVE 1150 DECEPTIVE PRACTICE CREDIT CARD FRAUD RESIDENCE false false ... 29 25 11 NaN NaN 2018 12/16/2018 04:11:17 PM NaN NaN NaN
249059 11536944 JB555455 12/07/2018 09:00:00 PM 112XX S PARNELL AVE 0486 BATTERY DOMESTIC BATTERY SIMPLE APARTMENT false true ... 34 49 08B NaN NaN 2018 12/16/2018 04:11:17 PM NaN NaN NaN
249060 11536948 JB555563 12/07/2018 06:00:00 PM 083XX S PEORIA ST 2826 OTHER OFFENSE HARASSMENT BY ELECTRONIC MEANS RESIDENCE false false ... 21 71 26 NaN NaN 2018 12/16/2018 04:11:17 PM NaN NaN NaN
249061 24343 JB545894 12/09/2018 04:33:00 AM 021XX N MULLIGAN AVE 0110 HOMICIDE FIRST DEGREE MURDER PORCH false false ... 36 19 01A 1133729 1913666 2018 12/16/2018 04:11:17 PM 41.91931247 -87.784093821 (41.91931247, -87.784093821)
249062 24344 JB546735 12/09/2018 08:49:00 PM 026XX S HOMAN AVE 0110 HOMICIDE FIRST DEGREE MURDER STREET false false ... 22 30 01A 1154122 1886317 2018 12/16/2018 04:11:17 PM 41.843881174 -87.709896602 (41.843881174, -87.709896602)

5 rows × 22 columns

이제 이 데이터프레임으로 분석을 시작하자. 데이터의 크기가 큰 만큼 시간이 오래 걸리기 때문에 Dask는 작업 진행도를 알 수 있는 ProgressBar란 것을 제공한다. 다음과 같이 ProgressBar를 만들고 등록한다.

In [16]:
from dask.diagnostics import ProgressBar

pbar = ProgressBar()
pbar.register()

일단 등록하면 작업의 진행도를 프로그레스 바 형태로 알려준다. 우선 각 열의 데이터 갯수를 세어보자.

In [17]:
%%time
df.count().compute()
[########################################] | 100% Completed | 38.8s
CPU times: user 42.4 s, sys: 5.41 s, total: 47.8 s
Wall time: 38.8 s
Out:
ID                      6766422
Case Number             6766418
Date                    6766422
Block                   6766422
IUCR                    6766422
Primary Type            6766422
Description             6766422
Location Description    6762210
Arrest                  6766422
Domestic                6766422
Beat                    6766422
District                6766375
Ward                    6151568
Community Area          6150393
FBI Code                6766422
X Coordinate            6705637
Y Coordinate            6705637
Year                    6766422
Updated On              6766422
Latitude                6705637
Longitude               6705637
Location                6705637
dtype: int64

각 열의 데이터 수를 세는 데만도 50초가 넘는 시간이 걸렸다.

Dask는 이러한 대량 데이터의 분석 작업을 돕기 위한 작업 스케줄러(task scheduler)라는 것을 제공한다. 작업 스케줄러는 하나의 작업을 여러개의 쓰레드, 프로세스, 노드 등이 나누어 분담하도록 한다.

현재 Dask에서 제공하는 스케줄러의 종류는 다음과 같다.

  • dask.get: 단일 쓰레드
  • dask.threaded.get: 멀티쓰레드 풀(pool)
  • dask.multiprocessing.get: 멀티프로세스 풀
  • distributed.Client.get: 여러대의 컴퓨터에서 분산 처리

병렬처리를 위해서는 어떠한 병렬 처리 방식을 사용할지, 작업 프로세스의 갯수는 어떻게 할지 등은 compute 명령에서 인수로 설정해야 한다. 다음 코드는 멀티프로세싱을 하고 4개의 CPU 코어를 동시에 사용하도록 설정한 예이다. (물론 이 코드가 실행되는 컴퓨터가 실제로 4개 이상의 코어를 가지고 있어야 성능이 개선된다.)

In [18]:
%%time
df.count().compute(scheduler='processes', num_workers=4)
[########################################] | 100% Completed | 17.1s
CPU times: user 450 ms, sys: 180 ms, total: 630 ms
Wall time: 17.2 s
Out:
ID                      6766422
Case Number             6766418
Date                    6766422
Block                   6766422
IUCR                    6766422
Primary Type            6766422
Description             6766422
Location Description    6762210
Arrest                  6766422
Domestic                6766422
Beat                    6766422
District                6766375
Ward                    6151568
Community Area          6150393
FBI Code                6766422
X Coordinate            6705637
Y Coordinate            6705637
Year                    6766422
Updated On              6766422
Latitude                6705637
Longitude               6705637
Location                6705637
dtype: int64

이번에는 처리 속도가 3배 이상 빨라졌음을 확인할 수 있다.

In [19]:
!rm data?.csv

질문/덧글

아직 질문이나 덧글이 없습니다. 첫번째 글을 남겨주세요!