diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
commit | e6918187568dbd01842d8d1d2c808ce16a894239 (patch) | |
tree | 64f88b554b444a49f656b6c656111a145cbbaa28 /src/arrow/python/pyarrow/_s3fs.pyx | |
parent | Initial commit. (diff) | |
download | ceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip |
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/arrow/python/pyarrow/_s3fs.pyx')
-rw-r--r-- | src/arrow/python/pyarrow/_s3fs.pyx | 284 |
1 files changed, 284 insertions, 0 deletions
diff --git a/src/arrow/python/pyarrow/_s3fs.pyx b/src/arrow/python/pyarrow/_s3fs.pyx new file mode 100644 index 000000000..5829d74d3 --- /dev/null +++ b/src/arrow/python/pyarrow/_s3fs.pyx @@ -0,0 +1,284 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# cython: language_level = 3 + +from pyarrow.lib cimport (check_status, pyarrow_wrap_metadata, + pyarrow_unwrap_metadata) +from pyarrow.lib import frombytes, tobytes, KeyValueMetadata +from pyarrow.includes.common cimport * +from pyarrow.includes.libarrow cimport * +from pyarrow.includes.libarrow_fs cimport * +from pyarrow._fs cimport FileSystem + + +cpdef enum S3LogLevel: + Off = <int8_t> CS3LogLevel_Off + Fatal = <int8_t> CS3LogLevel_Fatal + Error = <int8_t> CS3LogLevel_Error + Warn = <int8_t> CS3LogLevel_Warn + Info = <int8_t> CS3LogLevel_Info + Debug = <int8_t> CS3LogLevel_Debug + Trace = <int8_t> CS3LogLevel_Trace + + +def initialize_s3(S3LogLevel log_level=S3LogLevel.Fatal): + """ + Initialize S3 support + + Parameters + ---------- + log_level : S3LogLevel + level of logging + """ + cdef CS3GlobalOptions options + options.log_level = <CS3LogLevel> log_level + check_status(CInitializeS3(options)) + + +def finalize_s3(): + check_status(CFinalizeS3()) + + +cdef class S3FileSystem(FileSystem): + """ + S3-backed FileSystem implementation + + If neither access_key nor secret_key are provided, and role_arn is also not + provided, then attempts to initialize from AWS environment variables, + otherwise both access_key and secret_key must be provided. + + If role_arn is provided instead of access_key and secret_key, temporary + credentials will be fetched by issuing a request to STS to assume the + specified role. + + Note: S3 buckets are special and the operations available on them may be + limited or more expensive than desired. + + Parameters + ---------- + access_key : str, default None + AWS Access Key ID. Pass None to use the standard AWS environment + variables and/or configuration file. + secret_key : str, default None + AWS Secret Access key. Pass None to use the standard AWS environment + variables and/or configuration file. + session_token : str, default None + AWS Session Token. An optional session token, required if access_key + and secret_key are temporary credentials from STS. + anonymous : boolean, default False + Whether to connect anonymously if access_key and secret_key are None. + If true, will not attempt to look up credentials using standard AWS + configuration methods. + role_arn : str, default None + AWS Role ARN. If provided instead of access_key and secret_key, + temporary credentials will be fetched by assuming this role. + session_name : str, default None + An optional identifier for the assumed role session. + external_id : str, default None + An optional unique identifier that might be required when you assume + a role in another account. + load_frequency : int, default 900 + The frequency (in seconds) with which temporary credentials from an + assumed role session will be refreshed. + region : str, default 'us-east-1' + AWS region to connect to. + scheme : str, default 'https' + S3 connection transport scheme. + endpoint_override : str, default None + Override region with a connect string such as "localhost:9000" + background_writes : boolean, default True + Whether file writes will be issued in the background, without + blocking. + default_metadata : mapping or KeyValueMetadata, default None + Default metadata for open_output_stream. This will be ignored if + non-empty metadata is passed to open_output_stream. + proxy_options : dict or str, default None + If a proxy is used, provide the options here. Supported options are: + 'scheme' (str: 'http' or 'https'; required), 'host' (str; required), + 'port' (int; required), 'username' (str; optional), + 'password' (str; optional). + A proxy URI (str) can also be provided, in which case these options + will be derived from the provided URI. + The following are equivalent:: + + S3FileSystem(proxy_options='http://username:password@localhost:8020') + S3FileSystem(proxy_options={'scheme': 'http', 'host': 'localhost', + 'port': 8020, 'username': 'username', + 'password': 'password'}) + """ + + cdef: + CS3FileSystem* s3fs + + def __init__(self, *, access_key=None, secret_key=None, session_token=None, + bint anonymous=False, region=None, scheme=None, + endpoint_override=None, bint background_writes=True, + default_metadata=None, role_arn=None, session_name=None, + external_id=None, load_frequency=900, proxy_options=None): + cdef: + CS3Options options + shared_ptr[CS3FileSystem] wrapped + + if access_key is not None and secret_key is None: + raise ValueError( + 'In order to initialize with explicit credentials both ' + 'access_key and secret_key must be provided, ' + '`secret_key` is not set.' + ) + elif access_key is None and secret_key is not None: + raise ValueError( + 'In order to initialize with explicit credentials both ' + 'access_key and secret_key must be provided, ' + '`access_key` is not set.' + ) + + elif session_token is not None and (access_key is None or + secret_key is None): + raise ValueError( + 'In order to initialize a session with temporary credentials, ' + 'both secret_key and access_key must be provided in addition ' + 'to session_token.' + ) + + elif (access_key is not None or secret_key is not None): + if anonymous: + raise ValueError( + 'Cannot pass anonymous=True together with access_key ' + 'and secret_key.') + + if role_arn: + raise ValueError( + 'Cannot provide role_arn with access_key and secret_key') + + if session_token is None: + session_token = "" + + options = CS3Options.FromAccessKey( + tobytes(access_key), + tobytes(secret_key), + tobytes(session_token) + ) + elif anonymous: + if role_arn: + raise ValueError( + 'Cannot provide role_arn with anonymous=True') + + options = CS3Options.Anonymous() + elif role_arn: + + options = CS3Options.FromAssumeRole( + tobytes(role_arn), + tobytes(session_name), + tobytes(external_id), + load_frequency + ) + else: + options = CS3Options.Defaults() + + if region is not None: + options.region = tobytes(region) + if scheme is not None: + options.scheme = tobytes(scheme) + if endpoint_override is not None: + options.endpoint_override = tobytes(endpoint_override) + if background_writes is not None: + options.background_writes = background_writes + if default_metadata is not None: + if not isinstance(default_metadata, KeyValueMetadata): + default_metadata = KeyValueMetadata(default_metadata) + options.default_metadata = pyarrow_unwrap_metadata( + default_metadata) + + if proxy_options is not None: + if isinstance(proxy_options, dict): + options.proxy_options.scheme = tobytes(proxy_options["scheme"]) + options.proxy_options.host = tobytes(proxy_options["host"]) + options.proxy_options.port = proxy_options["port"] + proxy_username = proxy_options.get("username", None) + if proxy_username: + options.proxy_options.username = tobytes(proxy_username) + proxy_password = proxy_options.get("password", None) + if proxy_password: + options.proxy_options.password = tobytes(proxy_password) + elif isinstance(proxy_options, str): + options.proxy_options = GetResultValue( + CS3ProxyOptions.FromUriString(tobytes(proxy_options))) + else: + raise TypeError( + "'proxy_options': expected 'dict' or 'str', " + f"got {type(proxy_options)} instead.") + + with nogil: + wrapped = GetResultValue(CS3FileSystem.Make(options)) + + self.init(<shared_ptr[CFileSystem]> wrapped) + + cdef init(self, const shared_ptr[CFileSystem]& wrapped): + FileSystem.init(self, wrapped) + self.s3fs = <CS3FileSystem*> wrapped.get() + + @classmethod + def _reconstruct(cls, kwargs): + return cls(**kwargs) + + def __reduce__(self): + cdef CS3Options opts = self.s3fs.options() + + # if creds were explicitly provided, then use them + # else obtain them as they were last time. + if opts.credentials_kind == CS3CredentialsKind_Explicit: + access_key = frombytes(opts.GetAccessKey()) + secret_key = frombytes(opts.GetSecretKey()) + session_token = frombytes(opts.GetSessionToken()) + else: + access_key = None + secret_key = None + session_token = None + + return ( + S3FileSystem._reconstruct, (dict( + access_key=access_key, + secret_key=secret_key, + session_token=session_token, + anonymous=(opts.credentials_kind == + CS3CredentialsKind_Anonymous), + region=frombytes(opts.region), + scheme=frombytes(opts.scheme), + endpoint_override=frombytes(opts.endpoint_override), + role_arn=frombytes(opts.role_arn), + session_name=frombytes(opts.session_name), + external_id=frombytes(opts.external_id), + load_frequency=opts.load_frequency, + background_writes=opts.background_writes, + default_metadata=pyarrow_wrap_metadata(opts.default_metadata), + proxy_options={'scheme': frombytes(opts.proxy_options.scheme), + 'host': frombytes(opts.proxy_options.host), + 'port': opts.proxy_options.port, + 'username': frombytes( + opts.proxy_options.username), + 'password': frombytes( + opts.proxy_options.password)} + ),) + ) + + @property + def region(self): + """ + The AWS region this filesystem connects to. + """ + return frombytes(self.s3fs.region()) |