programing

데이터 프레임을 s3 Python에 직접 csv에 저장

newsource 2023. 7. 4. 21:56

데이터 프레임을 s3 Python에 직접 csv에 저장

새로운 CSV 파일에 업로드하고 싶은 판다 데이터 프레임이 있습니다.문제는 파일을 s3로 전송하기 전에 로컬로 저장하고 싶지 않다는 것입니다.데이터 프레임을 s3에 직접 쓰는 to_csv 같은 방법이 있습니까?저는 boto3를 사용하고 있습니다.
지금까지 제가 가진 것은 다음과 같습니다.

import boto3
s3 = boto3.client('s3', aws_access_key_id='key', aws_secret_access_key='secret_key')
read_file = s3.get_object(Bucket, Key)
df = pd.read_csv(read_file['Body'])

# Make alterations to DataFrame

# Then export DataFrame to CSV through direct transfer to s3

사용할 수 있는 항목:

from io import StringIO # python3; python2: BytesIO 
import boto3

bucket = 'my_bucket_name' # already created on S3
csv_buffer = StringIO()
df.to_csv(csv_buffer)
s3_resource = boto3.resource('s3')
s3_resource.Object(bucket, 'df.csv').put(Body=csv_buffer.getvalue())

S3 경로를 직접 사용할 수 있습니다.Panda 0.24.1을 사용합니다.

In [1]: import pandas as pd

In [2]: df = pd.DataFrame( [ [1, 1, 1], [2, 2, 2] ], columns=['a', 'b', 'c'])

In [3]: df
Out[3]:
   a  b  c
0  1  1  1
1  2  2  2

In [4]: df.to_csv('s3://experimental/playground/temp_csv/dummy.csv', index=False)

In [5]: pd.__version__
Out[5]: '0.24.1'

In [6]: new_df = pd.read_csv('s3://experimental/playground/temp_csv/dummy.csv')

In [7]: new_df
Out[7]:
   a  b  c
0  1  1  1
1  2  2  2

릴리스 노트:

S3 파일 처리

팬더는 이제 S3 연결을 처리하기 위해 s3fs를 사용합니다.이것은 어떤 코드도 깨지지 않을 것입니다.그러나 s3fs는 필수 종속성이 아니므로 이전 버전의 팬더의 boto처럼 별도로 설치해야 합니다.GH11915.

저는 로컬 파일 시스템처럼 s3를 (거의) 사용할 수 있는 s3fs를 좋아합니다.

다음을 수행할 수 있습니다.

import s3fs

bytes_to_write = df.to_csv(None).encode()
fs = s3fs.S3FileSystem(key=key, secret=secret)
with fs.open('s3://bucket/path/to/file.csv', 'wb') as f:
    f.write(bytes_to_write)

s3fs만 합니다.rb그리고.wb, 을 한 입니다.bytes_to_write 물건들

다음은 보다 최신의 답변입니다.

import s3fs

s3 = s3fs.S3FileSystem(anon=False)

# Use 'w' for py3, 'wb' for py2
with s3.open('<bucket-name>/<filename>.csv','w') as f:
    df.to_csv(f)

StringIO는 여러분의 기억을 갉아먹을 것입니다.이 방법을 사용하면 파일을 문자열로 변환한 다음 s3에 쓰는 것이 아니라 파일을 s3로 스트리밍합니다.판다의 데이터 프레임과 문자열 복사본을 메모리에 저장하는 것은 매우 비효율적으로 보입니다.

ec2 순간에 작업하는 경우 IAM 역할을 지정하여 s3에 작성할 수 있으므로 자격 증명을 직접 전달할 필요가 없습니다.에 전달하여 S3FileSystem()기능.설명서 참조:https://s3fs.readthedocs.io/en/latest/

당신이 면격하를 한다면.None데이터에 대한 첫 번째 인수가 문자열로 반환되기 때문입니다.여기서 S3에 한 번에 업로드하는 것은 쉬운 단계입니다.

또한 a를 통과하는 것이 가능해야 합니다.StringIO…에 이의 to_csv()하지만 끈을 사용하는 것이 더 쉬울 것입니다.

AWS 데이터 랭글러를 사용할 수도 있습니다.

import awswrangler as wr
    
wr.s3.to_csv(
    df=df,
    path="s3://...",
)

다중 부분 업로드를 처리하여 업로드 속도를 높일 수 있습니다.

이 작업은 다음을 사용하여 수행할 수 있습니다.client뿐만 아니라resource.

from io import StringIO
import boto3
s3 = boto3.client("s3",\
                  region_name=region_name,\
                  aws_access_key_id=aws_access_key_id,\
                  aws_secret_access_key=aws_secret_access_key)
csv_buf = StringIO()
df.to_csv(csv_buf, header=True, index=False)
csv_buf.seek(0)
s3.put_object(Bucket=bucket, Body=csv_buf.getvalue(), Key='path/test.csv')

저는 AWS 데이터 랭글러를 사용합니다.예:

import awswrangler as wr
import pandas as pd

# read a local dataframe
df = pd.read_parquet('my_local_file.gz')

# upload to S3 bucket
wr.s3.to_parquet(df=df, path='s3://mys3bucket/file_name.gz')

CSV 파일에도 동일하게 적용됩니다.에 에.read_parquet그리고.to_parquet,사용하다read_csv그리고.to_csv적절한 파일 확장자를 사용합니다.

▁you에때를 사용하고 있기 에.boto3.client()예:예:

import boto3
from io import StringIO #python3 
s3 = boto3.client('s3', aws_access_key_id='key', aws_secret_access_key='secret_key')
def copy_to_s3(client, df, bucket, filepath):
    csv_buf = StringIO()
    df.to_csv(csv_buf, header=True, index=False)
    csv_buf.seek(0)
    client.put_object(Bucket=bucket, Body=csv_buf.getvalue(), Key=filepath)
    print(f'Copy {df.shape[0]} rows to S3 Bucket {bucket} at {filepath}, Done!')

copy_to_s3(client=s3, df=df_to_upload, bucket='abc', filepath='def/test.csv')

사용할 수 있습니다.

  • 판다
  • 보토3
  • s3fs(버전 ≤ 0.4)

사용합니다to_csv와 함께s3://과 길에서storage_options

key = "folder/file.csv"

df.to_csv(
    f"s3://{YOUR_S3_BUCKET}/{key}",
    index=False,
    storage_options={
        "key": AWS_ACCESS_KEY_ID,
        "secret": AWS_SECRET_ACCESS_KEY,
        "token": AWS_SESSION_TOKEN,
    },
from io import StringIO
import boto3
#Creating Session With Boto3.
session = boto3.Session(
aws_access_key_id='<your_access_key_id>',
aws_secret_access_key='<your_secret_access_key>'
)
#Creating S3 Resource From the Session.
s3_res = session.resource('s3')
csv_buffer = StringIO()
df.to_csv(csv_buffer)
bucket_name = 'stackvidhya'
s3_object_name = 'df.csv'
s3_res.Object(bucket_name, s3_object_name).put(Body=csv_buffer.getvalue())
print("Dataframe is saved as CSV in S3 bucket.")

소스 MinIO를.minio 내 기능과 같은 파이썬 클라이언트 패키지:

import minio
import os
import pandas as pd

minio_client = minio.Minio(..)

def write_df_to_minio(df, 
                    minio_client, 
                    bucket_name, 
                    file_name="new-file.csv",
                    local_temp_folder="/tmp/", 
                    content_type="application/csv",
                    sep=",",
                    save_row_index=False):

    df.to_csv(os.path.join(local_temp_folder, file_name), sep=sep, index=save_row_index)
    
    minio_results = minio_client.fput_object(bucket_name=bucket_name,
                                             object_name=file_name,
                                             file_path=os.path.join(local_temp_folder, file_name),
                                             content_type=content_type)

    assert minio_results.object_name == file_name

또 다른 옵션은 S3와 Google Cloud Storage 및 Azure Blob Storage를 지원하는 cloudpathlib으로 이 작업을 수행하는 것입니다.아래 예를 참조하십시오.

import pandas as pd
from cloudpathlib import CloudPath

# read data from S3
df = pd.read_csv(CloudPath("s3://covid19-lake/rearc-covid-19-testing-data/csv/states_daily/states_daily.csv"))

# look at some of the data
df.head(1).T.iloc[:10]
#>                                       0
#> date                           20210307
#> state                                AK
#> positive                        56886.0
#> probableCases                       NaN
#> negative                            NaN
#> pending                             NaN
#> totalTestResultsSource  totalTestsViral
#> totalTestResults              1731628.0
#> hospitalizedCurrently              33.0
#> hospitalizedCumulative           1293.0

# writing to S3
with CloudPath("s3://bucket-you-can-write-to/data.csv").open("w") as f:
    df.to_csv(f)

CloudPath("s3://bucket-you-can-write-to/data.csv").exists()
#> True

참고, 전화할 수 없습니다.df.to_csv(CloudPath("s3://drivendata-public-assets/test-asdf2.csv"))판다가 자신에게 전달된 경로/경로를 처리하는 방식 때문입니다.대신 쓰기 위해 파일을 열고 해당 핸들을 직접 전달해야 합니다.to_csv.

따라서 S3에서 항상 다시 다운로드할 필요가 없도록 특정 옵션이나 다른 인증 메커니즘설정하거나 영구 캐시를 유지하는 몇 가지 추가적인 이점이 있습니다.

S3FS에 문제가 있거나 다음을 수행하는 검사에 문제가 있습니다.Lambda:

각 라이브러리에 대한 레이어를 생성하여 람다에 삽입해야 합니다.

여기에서 도면층을 작성하는 방법을 찾을 수 있습니다.

나는 버킷 s3에서 두 개의 열이 있는 csv를 읽었고, csv 파일의 내용은 판다 데이터 프레임에 넣었습니다.

예:

config.json

{
  "credential": {
    "access_key":"xxxxxx",
    "secret_key":"xxxxxx"
}
,
"s3":{
       "bucket":"mybucket",
       "key":"csv/user.csv"
   }
}

cls_config.json

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import os
import json

class cls_config(object):

    def __init__(self,filename):

        self.filename = filename


    def getConfig(self):

        fileName = os.path.join(os.path.dirname(__file__), self.filename)
        with open(fileName) as f:
        config = json.load(f)
        return config

cls_messages파이의

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import pandas as pd
import io

class cls_pandas(object):

    def __init__(self):
        pass

    def read(self,stream):

        df = pd.read_csv(io.StringIO(stream), sep = ",")
        return df

cls_s3.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import boto3
import json

class cls_s3(object):

    def  __init__(self,access_key,secret_key):

        self.s3 = boto3.client('s3', aws_access_key_id=access_key, aws_secret_access_key=secret_key)

    def getObject(self,bucket,key):

        read_file = self.s3.get_object(Bucket=bucket, Key=key)
        body = read_file['Body'].read().decode('utf-8')
        return body

test.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from cls_config import *
from cls_s3 import *
from cls_pandas import *

class test(object):

    def __init__(self):
        self.conf = cls_config('config.json')

    def process(self):

        conf = self.conf.getConfig()

        bucket = conf['s3']['bucket']
        key = conf['s3']['key']

        access_key = conf['credential']['access_key']
        secret_key = conf['credential']['secret_key']

        s3 = cls_s3(access_key,secret_key)
        ob = s3.getObject(bucket,key)

        pa = cls_pandas()
        df = pa.read(ob)

        print df

if __name__ == '__main__':
    test = test()
    test.process()

언급URL : https://stackoverflow.com/questions/38154040/save-dataframe-to-csv-directly-to-s3-python