Pyarrow

Read

import os

import pyarrow as pa

import pyarrow.dataset as ds

import pyarrow.compute as pc

import pyarrow.parquet as pq

 Read parquet

import s3fs

fs = s3fs.S3FileSystem()


table = pq.read_table("file.parquet")

table = pq.read_table("file.parquet", filesystem=fs) # no s3:// in file name


dataset = ds.dataset("file.parquet", format="parquet")

dataset = ds.dataset("file.parquet", format="parquet", filesystem=fs) # no s3:// in file name

 Filtering columns

table = dataset.to_table(columns=['col1', 'col3'])

Filtering rows

table.filter(


table = dataset.to_table(filter=ds.field('col1') == 'a')

table = dataset.to_table(filter=ds.field('col1').isin(["a",]))

From pandas

table = pa.Table.from_pandas(df)

Compute

https://arrow.apache.org/docs/python/compute.html 

import pyarrow as pa

import pyarrow.compute as pc

a = pa.array([1, 1, 2, 3])

pc.sum(a)

pc.cumulative_sum(a)


Store

parquet

pq.write_table(table, "file.parquet")

Create partitioned parquet

d = {'col1': ['a', 'b'], 'col2': [1, 2], 'col3': [1.5, 2.5]}

table = pa.table(d)

pc.sum(table["col2"])


os.mkdir("table.parquet")

pq.write_table(table.slice(0, 1), "table.parquet/part.0.parquet")

pq.write_table(table.slice(1, 2), "table.parquet/part.1.parquet")

dtypes

import pyarrow as pa


schema = {

    "col1": pa.string(),

    "col2": pa.int32(),

    "col3": pa.int64(),

    "col4": pa.float32(),

    "col5": pa.float64(),

    "col6": pa.timestamp("ns"),

    "col7": pa.timestamp("s"),

    "col8": pa.timestamp("s", tz="UTC"),

    "col9": pa.duration('s'), # can't store yet

}


schema = pa.schema(schema)

Converstion

To pandas

df = table.to_pandas()

Utils

See schema

table.schema

print(dataset.schema.to_string(show_field_metadata=False))