summaryrefslogtreecommitdiffstats
path: root/src/arrow/python/pyarrow/_s3fs.pyx
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
commite6918187568dbd01842d8d1d2c808ce16a894239 (patch)
tree64f88b554b444a49f656b6c656111a145cbbaa28 /src/arrow/python/pyarrow/_s3fs.pyx
parentInitial commit. (diff)
downloadceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz
ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/arrow/python/pyarrow/_s3fs.pyx')
-rw-r--r--src/arrow/python/pyarrow/_s3fs.pyx284
1 files changed, 284 insertions, 0 deletions
diff --git a/src/arrow/python/pyarrow/_s3fs.pyx b/src/arrow/python/pyarrow/_s3fs.pyx
new file mode 100644
index 000000000..5829d74d3
--- /dev/null
+++ b/src/arrow/python/pyarrow/_s3fs.pyx
@@ -0,0 +1,284 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# cython: language_level = 3
+
+from pyarrow.lib cimport (check_status, pyarrow_wrap_metadata,
+ pyarrow_unwrap_metadata)
+from pyarrow.lib import frombytes, tobytes, KeyValueMetadata
+from pyarrow.includes.common cimport *
+from pyarrow.includes.libarrow cimport *
+from pyarrow.includes.libarrow_fs cimport *
+from pyarrow._fs cimport FileSystem
+
+
+cpdef enum S3LogLevel:
+ Off = <int8_t> CS3LogLevel_Off
+ Fatal = <int8_t> CS3LogLevel_Fatal
+ Error = <int8_t> CS3LogLevel_Error
+ Warn = <int8_t> CS3LogLevel_Warn
+ Info = <int8_t> CS3LogLevel_Info
+ Debug = <int8_t> CS3LogLevel_Debug
+ Trace = <int8_t> CS3LogLevel_Trace
+
+
+def initialize_s3(S3LogLevel log_level=S3LogLevel.Fatal):
+ """
+ Initialize S3 support
+
+ Parameters
+ ----------
+ log_level : S3LogLevel
+ level of logging
+ """
+ cdef CS3GlobalOptions options
+ options.log_level = <CS3LogLevel> log_level
+ check_status(CInitializeS3(options))
+
+
+def finalize_s3():
+ check_status(CFinalizeS3())
+
+
+cdef class S3FileSystem(FileSystem):
+ """
+ S3-backed FileSystem implementation
+
+ If neither access_key nor secret_key are provided, and role_arn is also not
+ provided, then attempts to initialize from AWS environment variables,
+ otherwise both access_key and secret_key must be provided.
+
+ If role_arn is provided instead of access_key and secret_key, temporary
+ credentials will be fetched by issuing a request to STS to assume the
+ specified role.
+
+ Note: S3 buckets are special and the operations available on them may be
+ limited or more expensive than desired.
+
+ Parameters
+ ----------
+ access_key : str, default None
+ AWS Access Key ID. Pass None to use the standard AWS environment
+ variables and/or configuration file.
+ secret_key : str, default None
+ AWS Secret Access key. Pass None to use the standard AWS environment
+ variables and/or configuration file.
+ session_token : str, default None
+ AWS Session Token. An optional session token, required if access_key
+ and secret_key are temporary credentials from STS.
+ anonymous : boolean, default False
+ Whether to connect anonymously if access_key and secret_key are None.
+ If true, will not attempt to look up credentials using standard AWS
+ configuration methods.
+ role_arn : str, default None
+ AWS Role ARN. If provided instead of access_key and secret_key,
+ temporary credentials will be fetched by assuming this role.
+ session_name : str, default None
+ An optional identifier for the assumed role session.
+ external_id : str, default None
+ An optional unique identifier that might be required when you assume
+ a role in another account.
+ load_frequency : int, default 900
+ The frequency (in seconds) with which temporary credentials from an
+ assumed role session will be refreshed.
+ region : str, default 'us-east-1'
+ AWS region to connect to.
+ scheme : str, default 'https'
+ S3 connection transport scheme.
+ endpoint_override : str, default None
+ Override region with a connect string such as "localhost:9000"
+ background_writes : boolean, default True
+ Whether file writes will be issued in the background, without
+ blocking.
+ default_metadata : mapping or KeyValueMetadata, default None
+ Default metadata for open_output_stream. This will be ignored if
+ non-empty metadata is passed to open_output_stream.
+ proxy_options : dict or str, default None
+ If a proxy is used, provide the options here. Supported options are:
+ 'scheme' (str: 'http' or 'https'; required), 'host' (str; required),
+ 'port' (int; required), 'username' (str; optional),
+ 'password' (str; optional).
+ A proxy URI (str) can also be provided, in which case these options
+ will be derived from the provided URI.
+ The following are equivalent::
+
+ S3FileSystem(proxy_options='http://username:password@localhost:8020')
+ S3FileSystem(proxy_options={'scheme': 'http', 'host': 'localhost',
+ 'port': 8020, 'username': 'username',
+ 'password': 'password'})
+ """
+
+ cdef:
+ CS3FileSystem* s3fs
+
+ def __init__(self, *, access_key=None, secret_key=None, session_token=None,
+ bint anonymous=False, region=None, scheme=None,
+ endpoint_override=None, bint background_writes=True,
+ default_metadata=None, role_arn=None, session_name=None,
+ external_id=None, load_frequency=900, proxy_options=None):
+ cdef:
+ CS3Options options
+ shared_ptr[CS3FileSystem] wrapped
+
+ if access_key is not None and secret_key is None:
+ raise ValueError(
+ 'In order to initialize with explicit credentials both '
+ 'access_key and secret_key must be provided, '
+ '`secret_key` is not set.'
+ )
+ elif access_key is None and secret_key is not None:
+ raise ValueError(
+ 'In order to initialize with explicit credentials both '
+ 'access_key and secret_key must be provided, '
+ '`access_key` is not set.'
+ )
+
+ elif session_token is not None and (access_key is None or
+ secret_key is None):
+ raise ValueError(
+ 'In order to initialize a session with temporary credentials, '
+ 'both secret_key and access_key must be provided in addition '
+ 'to session_token.'
+ )
+
+ elif (access_key is not None or secret_key is not None):
+ if anonymous:
+ raise ValueError(
+ 'Cannot pass anonymous=True together with access_key '
+ 'and secret_key.')
+
+ if role_arn:
+ raise ValueError(
+ 'Cannot provide role_arn with access_key and secret_key')
+
+ if session_token is None:
+ session_token = ""
+
+ options = CS3Options.FromAccessKey(
+ tobytes(access_key),
+ tobytes(secret_key),
+ tobytes(session_token)
+ )
+ elif anonymous:
+ if role_arn:
+ raise ValueError(
+ 'Cannot provide role_arn with anonymous=True')
+
+ options = CS3Options.Anonymous()
+ elif role_arn:
+
+ options = CS3Options.FromAssumeRole(
+ tobytes(role_arn),
+ tobytes(session_name),
+ tobytes(external_id),
+ load_frequency
+ )
+ else:
+ options = CS3Options.Defaults()
+
+ if region is not None:
+ options.region = tobytes(region)
+ if scheme is not None:
+ options.scheme = tobytes(scheme)
+ if endpoint_override is not None:
+ options.endpoint_override = tobytes(endpoint_override)
+ if background_writes is not None:
+ options.background_writes = background_writes
+ if default_metadata is not None:
+ if not isinstance(default_metadata, KeyValueMetadata):
+ default_metadata = KeyValueMetadata(default_metadata)
+ options.default_metadata = pyarrow_unwrap_metadata(
+ default_metadata)
+
+ if proxy_options is not None:
+ if isinstance(proxy_options, dict):
+ options.proxy_options.scheme = tobytes(proxy_options["scheme"])
+ options.proxy_options.host = tobytes(proxy_options["host"])
+ options.proxy_options.port = proxy_options["port"]
+ proxy_username = proxy_options.get("username", None)
+ if proxy_username:
+ options.proxy_options.username = tobytes(proxy_username)
+ proxy_password = proxy_options.get("password", None)
+ if proxy_password:
+ options.proxy_options.password = tobytes(proxy_password)
+ elif isinstance(proxy_options, str):
+ options.proxy_options = GetResultValue(
+ CS3ProxyOptions.FromUriString(tobytes(proxy_options)))
+ else:
+ raise TypeError(
+ "'proxy_options': expected 'dict' or 'str', "
+ f"got {type(proxy_options)} instead.")
+
+ with nogil:
+ wrapped = GetResultValue(CS3FileSystem.Make(options))
+
+ self.init(<shared_ptr[CFileSystem]> wrapped)
+
+ cdef init(self, const shared_ptr[CFileSystem]& wrapped):
+ FileSystem.init(self, wrapped)
+ self.s3fs = <CS3FileSystem*> wrapped.get()
+
+ @classmethod
+ def _reconstruct(cls, kwargs):
+ return cls(**kwargs)
+
+ def __reduce__(self):
+ cdef CS3Options opts = self.s3fs.options()
+
+ # if creds were explicitly provided, then use them
+ # else obtain them as they were last time.
+ if opts.credentials_kind == CS3CredentialsKind_Explicit:
+ access_key = frombytes(opts.GetAccessKey())
+ secret_key = frombytes(opts.GetSecretKey())
+ session_token = frombytes(opts.GetSessionToken())
+ else:
+ access_key = None
+ secret_key = None
+ session_token = None
+
+ return (
+ S3FileSystem._reconstruct, (dict(
+ access_key=access_key,
+ secret_key=secret_key,
+ session_token=session_token,
+ anonymous=(opts.credentials_kind ==
+ CS3CredentialsKind_Anonymous),
+ region=frombytes(opts.region),
+ scheme=frombytes(opts.scheme),
+ endpoint_override=frombytes(opts.endpoint_override),
+ role_arn=frombytes(opts.role_arn),
+ session_name=frombytes(opts.session_name),
+ external_id=frombytes(opts.external_id),
+ load_frequency=opts.load_frequency,
+ background_writes=opts.background_writes,
+ default_metadata=pyarrow_wrap_metadata(opts.default_metadata),
+ proxy_options={'scheme': frombytes(opts.proxy_options.scheme),
+ 'host': frombytes(opts.proxy_options.host),
+ 'port': opts.proxy_options.port,
+ 'username': frombytes(
+ opts.proxy_options.username),
+ 'password': frombytes(
+ opts.proxy_options.password)}
+ ),)
+ )
+
+ @property
+ def region(self):
+ """
+ The AWS region this filesystem connects to.
+ """
+ return frombytes(self.s3fs.region())