logo

Pandas

王哲峰 / 2022-10-03


目录

DataFrame 多层索引

多层索引取值

下面是一个多索引 DataFrame 数据:

import pandas as pd
import numpy as np

df = pd.DataFrame(
    np.random.randint(50, 100, size = (4, 4)),
    columns = pd.MultiIndex.from_product(
        [["math", "physics"], 
         ["term1", "term2"]]
    ),
    index = pd.MultiIndex.from_tuples(
        [("class1", "LiLei"), 
         ("class2", "HanMeiMei"), 
         ("class2", "LiLei"), 
         ("class2", "HanMeiMei")]
    )
)
df.index.names = ["class", "name"]
print(df)
                  math       physics      
                 term1 term2   term1 term2
class  name                               
class1 LiLei        88    78      64    92
class2 HanMeiMei    57    70      88    68
       LiLei        72    60      60    73
       HanMeiMei    85    89      73    52

根据行索引查询

  1. 取外层索引为 class1 的数据:
print(df.loc["class1"])
       math       physics      
      term1 term2   term1 term2
name                           
LiLei    88    78      64    92
  1. 同时根据多个索引筛选取值:
print(df.loc[("class2", "HanMeiMei")])
                  math       physics      
                 term1 term2   term1 term2
class  name                               
class2 HanMeiMei    57    70      88    68
       HanMeiMei    85    89      73    52
  1. 同时根据多个索引筛选取值,这个方法不会带上外层索引:
print(df.loc["class2"].loc["HanMeiMei"])
           math       physics      
          term1 term2   term1 term2
name                               
HanMeiMei    57    70      88    68
HanMeiMei    85    89      73    52
  1. 根据内层索引取值,先交换内外层索引位置:
print(df.swaplevel())
                  math       physics      
                 term1 term2   term1 term2
name      class                           
LiLei     class1    88    78      64    92
HanMeiMei class2    57    70      88    68
LiLei     class2    72    60      60    73
HanMeiMei class2    85    89      73    52
  1. 通过取外层索引取值
print(df.swaplevel().loc["HanMeiMei"])
        math       physics      
       term1 term2   term1 term2
class                           
class2    57    70      88    68
class2    85    89      73    52

根据 column 查询

  1. 根据外层 column 取值
print(df["math"])
                  term1  term2
class  name                   
class1 LiLei         88     78
class2 HanMeiMei     57     70
       LiLei         72     60
       HanMeiMei     85     89
  1. 根据多层 column 联合取值
print(df["math", "term2"])
print(df.loc[:, ("math", "term2")])
print(df["math"]["term2"])
print(df[("math", "term2")])
class   name     
class1  LiLei        78
class2  HanMeiMei    70
        LiLei        60
        HanMeiMei    89
Name: (math, term2), dtype: int32
  1. 取内层索引先交换轴
print(df.swaplevel(axis = 1))
print(df.swaplevel(axis = 1)["term1"])
                  math  physics
class  name                    
class1 LiLei        88       64
class2 HanMeiMei    57       88
       LiLei        72       60
       HanMeiMei    85       73

索引与列转换

下面是一个单索引 DataFrame 数据:

df = pd.DataFrame({
    "X": range(5),
    "Y": range(5),
    "S": list("aaabb"),
    "Z": [1, 1, 2, 2, 2],
})
print(df)
   X  Y  S  Z
0  0  0  a  1
1  1  1  a  1
2  2  2  a  2
3  3  3  b  2
4  4  4  b  2

列转化为索引

  1. 指定某一列为索引
print(df.set_index("S"))
   X  Y  Z
S         
a  0  0  1
a  1  1  1
a  2  2  2
b  3  3  2
b  4  4  2
  1. 指定某一列为索引,并保留索引列
print(df.set_index("S", drop = False))
   X  Y  S  Z
S            
a  0  0  a  1
a  1  1  a  1
a  2  2  a  2
b  3  3  b  2
b  4  4  b  2
  1. 指定多行列作为索引,并保留索引列
print(df.set_index(["S", "Z"], drop = False))
     X  Y  S  Z
S Z            
a 1  0  0  a  1
  1  1  1  a  1
  2  2  2  a  2
b 2  3  3  b  2
  2  4  4  b  2

索引转换为列

  1. 指定多行列作为索引
df_multiindex = df.set_index(["S", "Z"])
print(df_multiindex)
     X  Y
S Z      
a 1  0  0
  1  1  1
  2  2  2
b 2  3  3
  2  4  4
  1. 将单个索引作为 DataFrame 对象的列
print(df_multiindex.reset_index("Z"))
   Z  X  Y
S         
a  1  0  0
a  1  1  1
a  2  2  2
b  2  3  3
b  2  4  4
  1. 将多级索引作为列
print(df_multiindex.reset_index())
   S  Z  X  Y
0  a  1  0  0
1  a  1  1  1
2  a  2  2  2
3  b  2  3  3
4  b  2  4  4
  1. 删除对指定索引,以上操作都不会直接对原 DataFrame 进行修改,若要直接对原 DataFrame 进行修改, 加上参数 inplace=True
df_multiindex.reset_index(inplace = True)
print(df_multiindex)
   S  Z  X  Y
0  a  1  0  0
1  a  1  1  1
2  a  2  2  2
3  b  2  3  3
4  b  2  4  4

Pyjanitor

pyjanitor 库的灵感来自于 R 语言的 janitor 包,英文单词即为清洁工之意,也就是通常用来进行数据处理或清洗数据。 pyjanitor 脱胎于 Pandas 生态圈,其使用的核心也是围绕着链式展开,可以使得我们更加专注于每一步操作的动作或谓词(Verbs)

pyjanitor 的 API 文档并不复杂,大多数 API 都是围绕着通用的清洗任务而设计。这主要涉及为几部分:

需要注意的是,尽管 pyjanitor 库名称带有 py 二字,但是在导入时则是输入 janitor; 就像 Beautifulsoup4 库在导入时写为 bs4 一样,以免无法导入而报错

coalesc

示例:比如我数据中有两个字段 a 和 b,但是两个字段或多或少都有缺失值。 需要定义一个新的字段 c,它由两个字段构建而来。如果第一个字段中存在缺失值, 则取第二个字段中的值,反之亦可;如果两者都为缺失,则保留缺失值

示例数据:

import pandas as pd
import numpy as np

df = pd.DataFrame({
    "a": [None, 2, None, None, 5, 6],
    "b": [1, None, None, 4, None, 6],
})
df

pandas 加 apply 实现:

import pandas as pd

def get_valid_value(col_x, col_y):
    if not pd.isna(col_x) and pd.isna(col_y):
        return col_x
    elif pd.isna(col_x) and not pd.isna(col_y):
        return col_y
    elif not (pd.isna(col_x) or pd.isna(col_y)):
        return col_x
    else:
        return np.nan

df["c"] = df.apply(lambda x: get_valid_value(x["a"], x["b"]), axis = 1)
df

pyjanitor coalesc 实现:

import janitor

df.coalesc(
    column_names = ["a", "b"],
    new_column_name = "c",
    delete_columns = False,
)

concatenate_columns 和 deconcatenate_column

import pandas as pd
import janitor

df = pd.DataFrame({
    "date_time": [
        "2020-02-01 11:00:00",
        "2020-02-03 12:10:11",
        "2020-03-24 13:24:31"
    ]
})

(
    df
        .deconcatenate_column(
            column_name = "date_time",
            new_column_names = ["date", "time"],
            sep = " ",
            preserve_position = False,
        )
        .deconcatenate_column(
            column_name = "date",
            new_column_names = ["year", "month", "day"],
            sep = "-",
            preserve_position = True,
        )
        .concatenate_columns(
            column_names = ["year", "month", "day"],
            new_column_name = "new_date",
            sep = "-",
        )
)

take_first

有的时候,会 groupby() 某个字段并对一些数值列进行操作、倒序排列, 最后每组取最大的数即倒序后的第一行。在 R 语言中可以很轻易直接这么实现:

library(dplyr)

df <- data.frame(
    a = c("x", "x", "y", "y", "y"),
    b = c(1, 3, 2, 5, 4)
)

df %>% 
    group_by(a) %>%
    arrange(desc(b)) %>%
    slice(1) %>%
    ungroup()

在没使用 pyjanitor 之前,我往往都是通过 Pandas 这么实现的:

import pandas as pd

df = pd.DataFrame({
    "a": ["x", "x", "y", "y", "y"],
    "b": [1, 3, 2, 5, 4]
})

(
    df
        .groupby("a")
        .apply(lambda grp: grp.sort_values(by = "b", ascending = False).head(1))
        .reset_index(drop = True)
)

这里利用了 groupby 之后的生成的 DataFrameGroupBy 对象再进行多余的降序取第一个的操作, 最后将分组后产生的索引值删除。现在可以直接使用 pyjanitor 中的 take_first 方法直接一步到位:

import pandas as pd
import janitor

df = pd.DataFrame({
    "a": ["x", "x", "y", "y", "y"],
    "b": [1, 3, 2, 5, 4]
})

df.take_first(subset = "a", by = "b", ascending = False)

自定义 janitor

pyjanitor 中的方法仅仅只是一些通用的实现方法,不同的人在使用过程中可能也会有不同的需要。 但好在我们也可以实现自己的 janitor 方法。pyjanitor 得益于 pandas-flavor 库的加持得以轻松实现链式方法

pandas-flavor 提供了能让使用者简单且快速地编写出带有 Pandas 味儿的方法:

本质上来说,pandas-flavor 库中提供的装饰器就等价于重写或新增了 DataFrame 类的方法, 在使用过程中如果方法有报错,那就需要还原加载 pandas 库之后再重新写入

关于 pandas-flavor 装饰器的用法,详见项目的 Github(https://github.com/Zsailer/pandas_flavor)

比如我们写一个简单清理数据字段或变量名称多余空格的方法:

import pandas as pd
import pandas_flavor as pf

@pf.register_dataframe_method
def strip_names(df):
    import re

    colnames = df.columns.tolist()
    colnames = list(map(lambda col: '_'.join(re.findall(r"\w+", col)), colnames))
    df.columns = colnames
    return df
data = pd.DataFrame({
    " a ": [1, 1],
    "  b  zz  ": [2, 1],
})
data
dta.strip_names()

最佳实践

pipe

pipe 即为管道, 把前一项输出的 df,作为后一项输入的 df, 同时把 df 操作函数对象作为第一参数,它所需的参数以 argskwargs 形式传入。 这样能避免产生中间的 df。当参数复杂(比如是巨大的 dictionary,或者是一连串函数计算后的结果)、 或高阶方法多时,比直接 chaining 可读性高

在每次分析工作完成后,把琐碎的数据清理工作以 pipe 的形式放在数据导入后的下一步

dtype_mapping = {
    "a": np.float32,
    "b": np.uint8,
    "c": np.float64,
    "d": np.int64,
    "e": str,
}
df_cleaned = (df
    .pipe(pd.DataFrame.sort_index, ascending = False)  # 按索引排序
    .pipe(pd.DataFrame.fillna, value = 0, method = "ffill")  # 缺失值处理
    .pipe(pd.DataFrame.astype, dtype_mapping)  # 数据类型转换
    .pipe(pd.DataFrame.clip, lower = -0.05, upper = 0.05)  # 极端值处理
)

封装成一个函数:

def clean_data(df):
    df_cleaned = (df
        .pipe(pd.DataFrame.sort_index, ascending = False)  # 按索引排序
        .pipe(pd.DataFrame.fillna, value = 0, method = "ffill")  # 缺失值处理
        .pipe(pd.DataFrame.astype, dtype_mapping)  # 数据类型转换
        .pipe(pd.DataFrame.clip, lower = -0.05, upper = 0.05)  # 极端值处理
    )
    return df_cleaned

assign

可以使用 assign 方法,把一些列生成操作集中在一起。 和直接用 df['x'] = ... 不同的是 assign 方法会生成一个新的 df, 原始的 df 不会变 ,不会有 setting with copy warning, 还有一个好处,就是不会因为生成新的操作而打断函数 chaining

df = pd.DataFrame(
    data = 25 + 5 * np.random.randn(10),
    columns = ["temp_c"]
)

df_new = df.assign(
    temp_f = lambda x: x["temp_c"] * 9 / 5 + 32,
    temp_k = lambda x: (x["temp_f"] + 459.67) * 5 / 9,
)

query

query 可以解决很多条件的筛选问题,明显 query 方法简洁,而且条件越多, 逻辑判断越多,可读性优势就越明显(前提是单个条件都是简单的判断)

df = pd.DataFrame(
    data = np.random.randn(10, 3),
    columns = list("abc")
)

# 普通方法
df.loc[((df["a"] > 0) & (df["b"] < 0.05)) | (df["c"] > df["b"])]

# query
df.query("(a > 0 and b < 0.05) or c > b")

resample

groupby 和 transform

向量化计算

assign 和 numpy select

在下面 df 上成列 c,如果同行列 a 的值 >0.5 并且 <0.9,那么列 c 同行的值等于列 b, 否则为 None

a          b
2007-01-08  0.786667        270
2007-01-09  0.853333        280
2007-01-10  0.866667        282
2007-01-11  0.880000        277
2007-01-12  0.880000        266
2007-01-15  0.866667        279

df.where 是最直接的解法,但是 where 有缺点,就是一次只能处理一个条件。 就是 condition1 满足,赋值 v1。不满足则 other

还有一种用 numpy 的 select。这个方法的好处是可以给定任意条件,并匹配对应的值。 满足条件 1,赋值 v1;满足条件 2,赋值 v2…。如果条件多了,也能一次完成赋值操作

普通方法:

def abcd_to_e(x):
    if x['a']>1:
        return 1
    elif x['b']<0:
        return x['b']
    elif x['a']>0.1 and x['b']<10:
        return 10
    elif ...:
        return 1000
    elif ...
    
    else: 
        return x['c']

df.apply(abcd_to_e, axis = 1)

numpy select 方法:

np.random.seed(123)

df = pd.DataFrame(
    np.random.randn(10, 2),
    columns = list("ab")
)

df.assign(c = np.select(
    [(df.a > 0.5) & (df.b < 0.9)], 
    [df.b], 
    default = np.nan
))

timeseries

一年的时间序列数据,读取每月第一天的数据

index = pd.date_range("01/01/2021", "12/31/2021")
df = pd.DataFrame(
    data = np.random.randn(index.size, 3),
    index = index,
    columns = list("abc")
)
# 删除所有单数月份的1号
df_droped = df.drop(
    labels = [datetime.datetime(2021, i, 1) for i in range(1, 13, 2)]
)

方法 1:

df_droped.loc[df_droped.index.day == 1]

方法 2:

df_droped.resample("MS").first()
df_droped.resample("MS").agg("first")

方法 3:

df_droped.asfreq("MS")

方法 4:

df_droped.groupby(pd.Grouper(freq = "MS")).first()

参考