"""
Accessing media associated with a CLDF dataset.
You can iterate over the :class:`.File` objects associated with media using the :class:`.Media`
wrapper:
.. code-block:: python
from pycldf.media import Media
for f in Media(dataset):
if f.mimetype.type == 'audio':
f.save(directory)
or instantiate a :class:`.File` from a :class:`pycldf.orm.Object`:
.. code-block:: python
from pycldf.media import File
f = File.from_dataset(dataset, dataset.get_object('MediaTable', 'theid'))
"""
import io
import json
import base64
from typing import Union, TYPE_CHECKING, Optional, Callable
import pathlib
import zipfile
import functools
import mimetypes
import collections
import urllib.parse
import urllib.request
from collections.abc import Generator
from csvw.metadata import Table, Column
from csvw.datatypes import anyURI
from pycldf import orm
from pycldf.fileutil import splitfile, catfile, PathType
if TYPE_CHECKING:
from pycldf import Dataset # pragma: no cover
from pycldf.dataset import RowType # pragma: no cover
from pycldf.validators import DatasetValidator # pragma: no cover
__all__ = ['Mimetype', 'MediaTable', 'File']
StrOrBytes = Union[str, bytes]
[docs]class File: # pylint: disable=too-many-instance-attributes
"""
A `File` represents a row in a MediaTable, providing functionality to access the contents.
:ivar id: The ID of the item.
:ivar url: The URL (as `str`) to download the content associated with the item.
`File` supports media files within ZIP archives as specified in CLDF 1.2. I.e.
- :meth:`read` will extract the specified file from a downloaded ZIP archive and
- :meth:`save` will write a (deflated) ZIP archive containing the specified file as single \
member.
"""
def __init__(self, media: 'MediaTable', row: 'RowType'):
self.row: 'RowType' = row
self.id: str = row[media.filename_col.name]
self._mimetype: str = row[media.mimetype_col.name]
self.url: Optional[str] = None
self.scheme = None
self.url_reader = media.url_reader
self.path_in_zip: Optional[str] \
= row.get(media.path_in_zip_col.name) if media.path_in_zip_col else None
self._dsdir: pathlib.Path = media.ds.directory
if media.url_col:
# 1. Look for a downloadUrl property:
self.url = row[media.url_col.name]
else:
# 2. Expand valueUrl property:
if media.id_col and media.id_col.valueUrl:
self.url = media.id_col.valueUrl.expand(**row)
if self.url:
self.url = anyURI.to_string(self.url)
self.parsed_url = urllib.parse.urlparse(self.url)
self.scheme = self.parsed_url.scheme or 'file'
self.relpath = self.parsed_url.path
while self.relpath.startswith('/'):
self.relpath = self.relpath[1:]
[docs] @classmethod
def from_dataset(
cls, ds: 'Dataset', row_or_object: Union[dict, orm.Media]) -> 'File':
"""
Factory method to instantiate a `File` bypassing the `Media` wrapper.
"""
return cls(
MediaTable(ds),
row_or_object.data if isinstance(row_or_object, orm.Media) else row_or_object)
def __getitem__(self, item) -> dict:
"""
Access to the underlying row `dict`.
"""
return self.row[item]
@functools.cached_property
def mimetype(self) -> 'Mimetype':
"""
The `Mimetype` object associated with the item.
While the mediaType column is required by the CLDF spec, this might be disabled.
If so, we use "out-of-band" methods to figure out a mimetype for the file.
"""
if self._mimetype:
# We take the mimetype reported in the dataset as authoritative.
return Mimetype(self._mimetype)
# If no mimetype is specified explicitly, we fall back to mimetype detection mechanisms:
if self.scheme in ['file', 'http', 'https']:
mt, _ = mimetypes.guess_type(self.parsed_url.path)
if mt:
return Mimetype(mt)
if self.scheme == 'data':
mt, _, _ = self.parsed_url.path.partition(',')
if mt.endswith(';base64'):
mt = mt.replace(';base64', '').strip()
if mt:
return Mimetype(mt)
# There's an explicit default mimetype for data URLs!
return Mimetype('text/plain;charset=US-ASCII')
if self.scheme in ['http', 'https']:
res = urllib.request.urlopen( # too lazy to mock with with. pylint: disable=R1732
urllib.request.Request(self.url, method="HEAD"))
mt = res.headers.get('Content-Type')
if mt:
return Mimetype(mt)
return Mimetype('application/octet-stream')
[docs] def local_path(self, d: pathlib.Path = None) -> Optional[pathlib.Path]:
"""
:return: The expected path of the file in the directory `d` in the case of files \
downloaded using the `downloadmedia` command. If `d` is a file it is accepted as full \
local path if it has the same name as the filename in the URL.
"""
if d is None:
if self.scheme == 'file':
return self._dsdir / urllib.parse.unquote(self.relpath)
return None
if d.is_file() and self.parsed_url:
if d.name == pathlib.Path(self.parsed_url.path).name:
# Support the use case of files in zip archives, where the archives are available
# locally, e.g. as download of a separate media file deposit from Zenodo.
return d
zip_ext = '.zip' if self.path_in_zip else (self.mimetype.extension or '')
return d.joinpath(f'{self.id}{zip_ext}')
[docs] def read_json(self, d=None):
"""Reads JSON data."""
assert self.mimetype.subtype.endswith('json')
return json.loads(self.read(d=d))
[docs] def read(self, d: Optional[pathlib.Path] = None) -> Optional[StrOrBytes]:
"""
:param d: A local path where the file has been saved before - as expected by `local_path`. \
If `None`, the content will be read from the file's URL.
"""
if self.path_in_zip:
zipcontent = None
if d:
zipcontent = self.local_path(d).read_bytes()
if zipcontent is None and self.url:
zipcontent = self.url_reader[self.scheme](
self.parsed_url, Mimetype('application/zip'))
if zipcontent:
with zipfile.ZipFile(io.BytesIO(zipcontent)) as zf:
return self.mimetype.read(zf.read(self.path_in_zip))
return None # pragma: no cover
if d:
return self.mimetype.read(self.local_path(d).read_bytes())
if self.url:
try:
return self.url_reader[self.scheme](self.parsed_url, self.mimetype)
except KeyError as e:
raise ValueError(f'Unsupported URL scheme: {self.scheme}') from e
return None # pragma: no cover
[docs] def save(self, d: pathlib.Path) -> pathlib.Path:
"""
Saves the content of `File` in directory `d`.
:return: Path of the local file where the content has been saved.
.. note::
We use the identifier of the media item (i.e. the content of the ID column of the
associated row) as stem of the file to be written.
"""
p = self.local_path(d)
if not p.exists():
if self.path_in_zip:
with zipfile.ZipFile(p, 'w', compression=zipfile.ZIP_DEFLATED) as zf:
zf.writestr(self.path_in_zip, self.mimetype.write(self.read()))
else:
self.mimetype.write(self.read(), p)
return p
Media = MediaTable
[docs]class Mimetype:
"""
A media type specification.
:ivar type: The (main) type as `str`.
:ivar subtype: The subtype as `str`.
:ivar encoding: The encoding specified with a "charset" parameter.
"""
def __init__(self, s):
self.string = s
mtype, _, param = self.string.partition(';')
param = param.strip()
self.type, _, self.subtype = mtype.partition('/')
# for compatibility reasons
if self.type == 'audio' and self.subtype.lower() in {'wav'}:
self.subtype = 'x-wav'
if param.startswith('charset='):
self.encoding = param.replace('charset=', '').strip()
else:
self.encoding = 'utf8'
def __eq__(self, other):
return self.string == other if isinstance(other, str) else \
(self.type, self.subtype) == (other.type, other.subtype)
@property
def is_text(self) -> bool:
"""Whether the mimetype describes text, and hence data should be read as str."""
return self.type == 'text'
@property
def extension(self) -> Union[None, str]:
"""Return a suitable filename extension for the mimetype."""
return mimetypes.guess_extension(f'{self.type}/{self.subtype}')
[docs] def read(self, data: bytes) -> StrOrBytes:
"""Read data, inferring the encoding from the mimetype."""
if self.is_text and not isinstance(data, str):
return data.decode(self.encoding)
return data
[docs] def write(self, data: StrOrBytes, p: Optional[pathlib.Path] = None) -> Union[int, StrOrBytes]:
"""The mimetype dictates how/if to encode data."""
res = data.encode(self.encoding) if self.is_text else data
return p.write_bytes(res) if p else res
def read_data_url(url: urllib.parse.ParseResult, mimetype: Mimetype) -> StrOrBytes:
"""Read data from a data:// URL."""
spec, _, data = url.path.partition(',')
if spec.endswith(';base64'):
data = base64.b64decode(data)
data = mimetype.read(data)
if mimetype.is_text:
data = urllib.parse.unquote(data)
return data
def read_file_url(d: PathType, url: urllib.parse.ParseResult, mimetype: Mimetype) -> StrOrBytes:
"""Read data from a file:// URL."""
path = url.path
while path.startswith('/'):
path = path[1:]
if isinstance(d, str): # pragma: no cover
# We are accessing media of dataset which has been accessed over HTTP.
assert urllib.parse.urlparse(d).scheme.startswith('http')
return read_http_url(urllib.parse.urlparse(urllib.parse.urljoin(d, path)), mimetype)
return mimetype.read(d.joinpath(urllib.parse.unquote(path)).read_bytes())
def read_http_url(url: urllib.parse.ParseResult, mimetype: Mimetype) -> StrOrBytes:
"""Read data from an HTTP URL."""
return mimetype.read(urllib.request.urlopen(urllib.parse.urlunparse(url)).read())