Source code for pyathena.filesystem.s3_object

# -*- coding: utf-8 -*-
from __future__ import annotations

import copy
import logging
from datetime import datetime
from typing import Any, Dict, Iterator, MutableMapping, Optional

_logger = logging.getLogger(__name__)  # type: ignore

_API_FIELD_TO_S3_OBJECT_PROPERTY = {
    "ETag": "etag",
    "CacheControl": "cache_control",
    "ContentDisposition": "content_disposition",
    "ContentEncoding": "content_encoding",
    "ContentLanguage": "content_language",
    "ContentLength": "content_length",
    "ContentType": "content_type",
    "Expires": "expires",
    "WebsiteRedirectLocation": "website_redirect_location",
    "ServerSideEncryption": "server_side_encryption",
    "SSECustomerAlgorithm": "sse_customer_algorithm",
    "SSEKMSKeyId": "sse_kms_key_id",
    "BucketKeyEnabled": "bucket_key_enabled",
    "StorageClass": "storage_class",
    "ObjectLockMode": "object_lock_mode",
    "ObjectLockRetainUntilDate": "object_lock_retain_until_date",
    "ObjectLockLegalHoldStatus": "object_lock_legal_hold_status",
    "Metadata": "metadata",
    "LastModified": "last_modified",
}


[docs] class S3ObjectType: """Constants for S3 object types in filesystem operations. These constants are used to distinguish between directories and files when working with S3 paths through the S3FileSystem interface. """ S3_OBJECT_TYPE_DIRECTORY: str = "directory" S3_OBJECT_TYPE_FILE: str = "file"
[docs] class S3StorageClass: """Constants for Amazon S3 storage classes. S3 storage classes determine the availability, durability, and cost characteristics of stored objects. Each class is optimized for different access patterns and use cases. Storage classes: - STANDARD: Default storage for frequently accessed data - REDUCED_REDUNDANCY: Lower cost, reduced durability (deprecated) - STANDARD_IA: Infrequently accessed data with rapid retrieval - ONEZONE_IA: Lower cost IA storage in single availability zone - INTELLIGENT_TIERING: Automatic tiering between frequent/infrequent - GLACIER: Archive storage for long-term backup - DEEP_ARCHIVE: Lowest cost archive storage - GLACIER_IR: Archive with faster retrieval than standard Glacier - OUTPOSTS: Storage on AWS Outposts See Also: AWS S3 storage classes documentation: https://docs.aws.amazon.com/s3/latest/userguide/storage-class-intro.html """ S3_STORAGE_CLASS_STANDARD: str = "STANDARD" S3_STORAGE_CLASS_REDUCED_REDUNDANCY: str = "REDUCED_REDUNDANCY" S3_STORAGE_CLASS_STANDARD_IA: str = "STANDARD_IA" S3_STORAGE_CLASS_ONEZONE_IA: str = "ONEZONE_IA" S3_STORAGE_CLASS_INTELLIGENT_TIERING: str = "INTELLIGENT_TIERING" S3_STORAGE_CLASS_GLACIER: str = "GLACIER" S3_STORAGE_CLASS_DEEP_ARCHIVE: str = "DEEP_ARCHIVE" S3_STORAGE_CLASS_OUTPOSTS: str = "OUTPOSTS" S3_STORAGE_CLASS_GLACIER_IR: str = "GLACIER_IR" S3_STORAGE_CLASS_BUCKET: str = "BUCKET" S3_STORAGE_CLASS_DIRECTORY: str = "DIRECTORY"
[docs] class S3Object(MutableMapping[str, Any]): """Represents an S3 object with metadata and filesystem-like properties. This class provides a dictionary-like interface to S3 object metadata, making it easier to work with S3 objects in filesystem operations. It handles the mapping between S3 API field names and more pythonic property names. The object supports both dictionary-style access and property-style access to metadata fields like content type, storage class, encryption settings, and object lock configurations. Example: >>> s3_obj = S3Object({"ContentType": "text/csv", "ContentLength": 1024}) >>> print(s3_obj.content_type) # "text/csv" >>> print(s3_obj["content_length"]) # 1024 >>> s3_obj.storage_class = "STANDARD_IA" Note: This class is primarily used internally by S3FileSystem for representing S3 objects in filesystem operations. """
[docs] def __init__( self, init: Dict[str, Any], **kwargs, ) -> None: if init: filtered = {} for k, v in init.items(): if k not in _API_FIELD_TO_S3_OBJECT_PROPERTY: continue filtered[_API_FIELD_TO_S3_OBJECT_PROPERTY[k]] = v if "StorageClass" not in init: # https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadObject.html#API_HeadObject_ResponseSyntax # Amazon S3 returns this header for all objects except for # S3 Standard storage class objects. filtered[_API_FIELD_TO_S3_OBJECT_PROPERTY["StorageClass"]] = ( S3StorageClass.S3_STORAGE_CLASS_STANDARD ) super().update(filtered) if "Size" in init: self.content_length = init["Size"] self.size = init["Size"] elif "ContentLength" in init: self.size = init["ContentLength"] else: self.content_length = 0 self.size = 0 super().update({_API_FIELD_TO_S3_OBJECT_PROPERTY.get(k, k): v for k, v in kwargs.items()}) if self.get("key") is None: self.name = self.get("bucket") else: self.name = f"{self.get('bucket')}/{self.get('key')}"
[docs] def get(self, key: str, default: Any = None) -> Any: return super().get(key, default)
def __getitem__(self, item: str) -> Any: return self.__dict__.get(item) def __getattr__(self, item: str): return self.get(item) def __setitem__(self, key: str, value: Any) -> None: self.__dict__[key] = value def __setattr__(self, attr: str, value: Any) -> None: self[attr] = value def __delitem__(self, key: str) -> None: del self.__dict__[key] def __iter__(self) -> Iterator[str]: return iter(self.__dict__.keys()) def __len__(self) -> int: return len(self.__dict__) def __str__(self): return str(self.__dict__)
[docs] def to_dict(self) -> Dict[str, Any]: """Convert S3Object to dictionary representation. Returns: Deep copy of the object's attributes as a dictionary. """ return copy.deepcopy(self.__dict__)
[docs] def to_api_repr(self) -> Dict[str, Any]: fields = {} for k, v in _API_FIELD_TO_S3_OBJECT_PROPERTY.items(): if k in ["ETag", "ContentLength", "LastModified"]: # Excluded from API representation continue field = self.get(v) if field is not None: fields[k] = field return fields
[docs] class S3PutObject: """Represents the response from an S3 PUT object operation. This class encapsulates the metadata returned when uploading an object to S3, including encryption details, versioning information, and integrity checksums. Attributes: expiration: Object expiration time if lifecycle policy applies. version_id: Version ID if bucket versioning is enabled. etag: Entity tag for the uploaded object. server_side_encryption: Server-side encryption method used. Various checksum properties: For data integrity verification. Note: This class is used internally by S3FileSystem operations and typically not instantiated directly by users. """
[docs] def __init__(self, response: Dict[str, Any]) -> None: self._expiration: Optional[str] = response.get("Expiration") self._version_id: Optional[str] = response.get("VersionId") self._etag: Optional[str] = response.get("ETag") self._checksum_crc32: Optional[str] = response.get("ChecksumCRC32") self._checksum_crc32c: Optional[str] = response.get("ChecksumCRC32C") self._checksum_sha1: Optional[str] = response.get("ChecksumSHA1") self._checksum_sha256: Optional[str] = response.get("ChecksumSHA256") self._server_side_encryption = response.get("ServerSideEncryption") self._sse_customer_algorithm = response.get("SSECustomerAlgorithm") self._sse_customer_key_md5 = response.get("SSECustomerKeyMD5") self._sse_kms_key_id = response.get("SSEKMSKeyId") self._sse_kms_encryption_context = response.get("SSEKMSEncryptionContext") self._bucket_key_enabled = response.get("BucketKeyEnabled") self._request_charged = response.get("RequestCharged")
@property def expiration(self) -> Optional[str]: return self._expiration @property def version_id(self) -> Optional[str]: return self._version_id @property def etag(self) -> Optional[str]: return self._etag @property def checksum_crc32(self) -> Optional[str]: return self._checksum_crc32 @property def checksum_crc32c(self) -> Optional[str]: return self._checksum_crc32c @property def checksum_sha1(self) -> Optional[str]: return self._checksum_sha1 @property def checksum_sha256(self) -> Optional[str]: return self._checksum_sha256 @property def server_side_encryption(self) -> Optional[str]: return self._server_side_encryption @property def sse_customer_algorithm(self) -> Optional[str]: return self._sse_customer_algorithm @property def sse_customer_key_md5(self) -> Optional[str]: return self._sse_customer_key_md5 @property def sse_kms_key_id(self) -> Optional[str]: return self._sse_kms_key_id @property def sse_kms_encryption_context(self) -> Optional[str]: return self._sse_kms_encryption_context @property def bucket_key_enabled(self) -> Optional[bool]: return self._bucket_key_enabled @property def request_charged(self) -> Optional[str]: return self._request_charged
[docs] def to_dict(self) -> Dict[str, Any]: return copy.deepcopy(self.__dict__)
[docs] class S3MultipartUpload: """Represents an S3 multipart upload operation. This class manages the metadata for multipart uploads, which allow uploading large files in chunks for better reliability and performance. It tracks upload identifiers, encryption settings, and lifecycle rules. Attributes: bucket: S3 bucket name for the upload. key: Object key being uploaded. upload_id: Unique identifier for the multipart upload. server_side_encryption: Encryption method applied to the upload. abort_date/abort_rule_id: Lifecycle rule information for upload cleanup. Note: Used internally by S3FileSystem for large file upload operations. """
[docs] def __init__(self, response: Dict[str, Any]) -> None: self._abort_date = response.get("AbortDate") self._abort_rule_id = response.get("AbortRuleId") self._bucket = response.get("Bucket") self._key = response.get("Key") self._upload_id = response.get("UploadId") self._server_side_encryption = response.get("ServerSideEncryption") self._sse_customer_algorithm = response.get("SSECustomerAlgorithm") self._sse_customer_key_md5 = response.get("SSECustomerKeyMD5") self._sse_kms_key_id = response.get("SSEKMSKeyId") self._sse_kms_encryption_context = response.get("SSEKMSEncryptionContext") self._bucket_key_enabled = response.get("BucketKeyEnabled") self._request_charged = response.get("RequestCharged") self._checksum_algorithm = response.get("ChecksumAlgorithm")
@property def abort_date(self) -> Optional[datetime]: return self._abort_date @property def abort_rule_id(self) -> Optional[str]: return self._abort_rule_id @property def bucket(self) -> Optional[str]: return self._bucket @property def key(self) -> Optional[str]: return self._key @property def upload_id(self) -> Optional[str]: return self._upload_id @property def server_side_encryption(self) -> Optional[str]: return self._server_side_encryption @property def sse_customer_algorithm(self) -> Optional[str]: return self._sse_customer_algorithm @property def sse_customer_key_md5(self) -> Optional[str]: return self._sse_customer_key_md5 @property def sse_kms_key_id(self) -> Optional[str]: return self._sse_kms_key_id @property def sse_kms_encryption_context(self) -> Optional[str]: return self._sse_kms_encryption_context @property def bucket_key_enabled(self) -> Optional[bool]: return self._bucket_key_enabled @property def request_charged(self) -> Optional[str]: return self._request_charged @property def checksum_algorithm(self) -> Optional[str]: return self._checksum_algorithm
[docs] class S3MultipartUploadPart: """Represents a single part in an S3 multipart upload operation. Each part in a multipart upload has its own metadata including checksums, encryption details, and part identification. This class manages that metadata and provides methods to convert it to API-compatible formats. Attributes: part_number: The sequential part number (1-based). etag: Entity tag for this specific part. checksum_*: Various integrity checksums for the part data. server_side_encryption: Encryption settings for this part. Note: Parts must be at least 5MB except for the last part. Used internally by S3FileSystem for chunked upload operations. """
[docs] def __init__(self, part_number: int, response: Dict[str, Any]) -> None: self._part_number = part_number self._copy_source_version_id: Optional[str] = response.get("CopySourceVersionId") copy_part_result = response.get("CopyPartResult") if copy_part_result: self._last_modified: Optional[datetime] = copy_part_result.get("LastModified") self._etag: Optional[str] = copy_part_result.get("ETag") self._checksum_crc32: Optional[str] = copy_part_result.get("ChecksumCRC32") self._checksum_crc32c: Optional[str] = copy_part_result.get("ChecksumCRC32C") self._checksum_sha1: Optional[str] = copy_part_result.get("ChecksumSHA1") self._checksum_sha256: Optional[str] = copy_part_result.get("ChecksumSHA256") else: self._last_modified = None self._etag = response.get("ETag") self._checksum_crc32 = response.get("ChecksumCRC32") self._checksum_crc32c = response.get("ChecksumCRC32C") self._checksum_sha1 = response.get("ChecksumSHA1") self._checksum_sha256 = response.get("ChecksumSHA256") self._server_side_encryption: Optional[str] = response.get("ServerSideEncryption") self._sse_customer_algorithm: Optional[str] = response.get("SSECustomerAlgorithm") self._sse_customer_key_md5: Optional[str] = response.get("SSECustomerKeyMD5") self._sse_kms_key_id: Optional[str] = response.get("SSEKMSKeyId") self._bucket_key_enabled: Optional[bool] = response.get("BucketKeyEnabled") self._request_charged: Optional[str] = response.get("RequestCharged")
@property def part_number(self) -> int: return self._part_number @property def copy_source_version_id(self) -> Optional[str]: return self._copy_source_version_id @property def last_modified(self) -> Optional[datetime]: return self._last_modified @property def etag(self) -> Optional[str]: return self._etag @property def checksum_crc32(self) -> Optional[str]: return self._checksum_crc32 @property def checksum_crc32c(self) -> Optional[str]: return self._checksum_crc32c @property def checksum_sha1(self) -> Optional[str]: return self._checksum_sha1 @property def checksum_sha256(self) -> Optional[str]: return self._checksum_sha256 @property def server_side_encryption(self) -> Optional[str]: return self._server_side_encryption @property def sse_customer_algorithm(self) -> Optional[str]: return self._sse_customer_algorithm @property def sse_customer_key_md5(self) -> Optional[str]: return self._sse_customer_key_md5 @property def sse_kms_key_id(self) -> Optional[str]: return self._sse_kms_key_id @property def bucket_key_enabled(self) -> Optional[bool]: return self._bucket_key_enabled @property def request_charged(self) -> Optional[str]: return self._request_charged
[docs] def to_api_repr(self) -> Dict[str, Any]: return { "ETag": self.etag, "ChecksumCRC32": self.checksum_crc32, "ChecksumCRC32C": self.checksum_crc32c, "ChecksumSHA1": self.checksum_sha1, "ChecksumSHA256": self.checksum_sha256, "PartNumber": self.part_number, }
[docs] class S3CompleteMultipartUpload: """Represents the completion of an S3 multipart upload operation. This class encapsulates the final response when a multipart upload is completed, including the final object location, versioning information, and consolidated metadata from all parts. Attributes: location: Final S3 URL of the completed object. bucket: S3 bucket containing the object. key: Final object key. version_id: Version ID if bucket versioning is enabled. etag: Final entity tag of the complete object. server_side_encryption: Encryption applied to the final object. Note: This represents the successful completion of a multipart upload. Used internally by S3FileSystem operations. """
[docs] def __init__(self, response: Dict[str, Any]) -> None: self._location: Optional[str] = response.get("Location") self._bucket: Optional[str] = response.get("Bucket") self._key: Optional[str] = response.get("Key") self._expiration: Optional[str] = response.get("Expiration") self._version_id: Optional[str] = response.get("VersionId") self._etag: Optional[str] = response.get("ETag") self._checksum_crc32: Optional[str] = response.get("ChecksumCRC32") self._checksum_crc32c: Optional[str] = response.get("ChecksumCRC32C") self._checksum_sha1: Optional[str] = response.get("ChecksumSHA1") self._checksum_sha256: Optional[str] = response.get("ChecksumSHA256") self._server_side_encryption = response.get("ServerSideEncryption") self._sse_kms_key_id = response.get("SSEKMSKeyId") self._bucket_key_enabled = response.get("BucketKeyEnabled") self._request_charged = response.get("RequestCharged")
@property def location(self) -> Optional[str]: return self._location @property def bucket(self) -> Optional[str]: return self._bucket @property def key(self) -> Optional[str]: return self._key @property def expiration(self) -> Optional[str]: return self._expiration @property def version_id(self) -> Optional[str]: return self._version_id @property def etag(self) -> Optional[str]: return self._etag @property def checksum_crc32(self) -> Optional[str]: return self._checksum_crc32 @property def checksum_crc32c(self) -> Optional[str]: return self._checksum_crc32c @property def checksum_sha1(self) -> Optional[str]: return self._checksum_sha1 @property def checksum_sha256(self) -> Optional[str]: return self._checksum_sha256 @property def server_side_encryption(self) -> Optional[str]: return self._server_side_encryption @property def sse_kms_key_id(self) -> Optional[str]: return self._sse_kms_key_id @property def bucket_key_enabled(self) -> Optional[bool]: return self._bucket_key_enabled @property def request_charged(self) -> Optional[str]: return self._request_charged
[docs] def to_dict(self): return copy.deepcopy(self.__dict__)