1
0
Fork 0

Adding upstream version 2.41.

Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
This commit is contained in:
Daniel Baumann 2025-06-21 11:26:35 +02:00
parent 9586bb3c92
commit c36e531662
Signed by: daniel.baumann
GPG key ID: BCC918A2ABD66424
3661 changed files with 2164106 additions and 0 deletions

View file

@ -0,0 +1,35 @@
if BUILD_PYLIBMOUNT
pylibmountexecdir = $(pyexecdir)/libmount
# Please, don't use $pythondir for the scripts. We have to use the same
# directory for binary stuff as well as for the scripts otherwise it's
# not possible to install 32-bit and 64-bit version on the same system.
pylibmountexec_LTLIBRARIES = pylibmount.la
pylibmountexec_PYTHON = libmount/python/__init__.py
pylibmount_la_SOURCES = \
libmount/python/pylibmount.c \
libmount/python/pylibmount.h \
libmount/python/fs.c \
libmount/python/tab.c
if LINUX
pylibmount_la_SOURCES += libmount/python/context.c
endif
pylibmount_la_LIBADD = libmount.la $(PYTHON_LIBS)
pylibmount_la_CFLAGS = \
$(AM_CFLAGS) \
$(PYTHON_CFLAGS) $(PYTHON_WARN_CFLAGS) \
-I$(ul_libmount_incdir) \
-fno-strict-aliasing #-ggdb3 -O0
pylibmount_la_LDFLAGS = \
-avoid-version -module -shared -export-dynamic
dist_check_SCRIPTS += libmount/python/test_mount_context.py
dist_check_SCRIPTS += libmount/python/test_mount_tab.py
dist_check_SCRIPTS += libmount/python/test_mount_tab_update.py
endif # BUILD_PYLIBMOUNT

View file

@ -0,0 +1,2 @@
from .pylibmount import *

1202
libmount/python/context.c Normal file

File diff suppressed because it is too large Load diff

857
libmount/python/fs.c Normal file
View file

@ -0,0 +1,857 @@
/*
* Python bindings for the libmount library.
*
* Copyright (C) 2013, Red Hat, Inc. All rights reserved.
* Written by Ondrej Oprala and Karel Zak
*
* This file is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 3 of the License, or (at your option) any later version.
*
* This file is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this file; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
/*
* TODO:
* mnt_fs_match_{source,target}
* mnt_fs_get_{attribute,option}
*/
#include "pylibmount.h"
#include <errno.h>
#define Fs_HELP "Fs(source=None, root=None, target=None, fstype=None, options=None, attributes=None, freq=0, passno=0)"
static PyMemberDef Fs_members[] = {
{NULL}
};
static PyObject *Fs_get_tag(FsObject *self)
{
const char *tag = NULL, *val = NULL;
PyObject *result;
if (mnt_fs_get_tag(self->fs, &tag, &val) != 0)
return NULL;
result = Py_BuildValue("(ss)", tag, val);
if (!result)
PyErr_SetString(PyExc_RuntimeError, CONSTRUCT_ERR);
return result;
}
static PyObject *Fs_get_id(FsObject *self)
{
return PyObjectResultInt(mnt_fs_get_id(self->fs));
}
static PyObject *Fs_get_parent_id(FsObject *self)
{
return PyObjectResultInt(mnt_fs_get_parent_id(self->fs));
}
static PyObject *Fs_get_devno(FsObject *self)
{
return PyObjectResultInt(mnt_fs_get_devno(self->fs));
}
static void _dump_debug_string(const char *lead, const char *s, char quote)
{
/* PySys_WriteStdout() will automatically truncate any '%s' token
* longer than a certain length (documented as 1000 bytes, but we
* give ourselves some margin here just in case). The only way I
* know to get around this is to print such strings in bite-sized
* chunks.
*/
static const unsigned int _PY_MAX_LEN = 900;
static const char *_PY_MAX_LEN_FMT = "%.900s";
unsigned int len;
if (lead != NULL)
PySys_WriteStdout("%s", lead);
if (quote != 0)
PySys_WriteStdout("%c", quote);
for (len = strlen(s); len > _PY_MAX_LEN; len -= _PY_MAX_LEN, s += _PY_MAX_LEN)
PySys_WriteStdout(_PY_MAX_LEN_FMT, s);
if (len > 0)
PySys_WriteStdout(_PY_MAX_LEN_FMT, s);
if (quote != 0)
PySys_WriteStdout("%c\n", quote);
else
PySys_WriteStdout("\n");
}
#define Fs_print_debug_HELP "print_debug()\n\n"
static PyObject *Fs_print_debug(FsObject *self)
{
PySys_WriteStdout("------ fs: %p\n", self->fs);
_dump_debug_string("source: ", mnt_fs_get_source(self->fs), 0);
_dump_debug_string("target: ", mnt_fs_get_target(self->fs), 0);
_dump_debug_string("fstype: ", mnt_fs_get_fstype(self->fs), 0);
if (mnt_fs_get_options(self->fs))
_dump_debug_string("optstr: ", mnt_fs_get_options(self->fs), 0);
if (mnt_fs_get_vfs_options(self->fs))
_dump_debug_string("VFS-optstr: ", mnt_fs_get_vfs_options(self->fs), 0);
if (mnt_fs_get_fs_options(self->fs))
_dump_debug_string("FS-opstr: ", mnt_fs_get_fs_options(self->fs), 0);
if (mnt_fs_get_user_options(self->fs))
_dump_debug_string("user-optstr: ", mnt_fs_get_user_options(self->fs), 0);
if (mnt_fs_get_optional_fields(self->fs))
_dump_debug_string("optional-fields: ", mnt_fs_get_optional_fields(self->fs), '\'');
if (mnt_fs_get_attributes(self->fs))
_dump_debug_string("attributes: ", mnt_fs_get_attributes(self->fs), 0);
if (mnt_fs_get_root(self->fs))
_dump_debug_string("root: ", mnt_fs_get_root(self->fs), 0);
if (mnt_fs_get_swaptype(self->fs))
_dump_debug_string("swaptype: ", mnt_fs_get_swaptype(self->fs), 0);
if (mnt_fs_get_size(self->fs))
PySys_WriteStdout("size: %jd\n", mnt_fs_get_size(self->fs));
if (mnt_fs_get_usedsize(self->fs))
PySys_WriteStdout("usedsize: %jd\n", mnt_fs_get_usedsize(self->fs));
if (mnt_fs_get_priority(self->fs))
PySys_WriteStdout("priority: %d\n", mnt_fs_get_priority(self->fs));
if (mnt_fs_get_bindsrc(self->fs))
_dump_debug_string("bindsrc: ", mnt_fs_get_bindsrc(self->fs), 0);
if (mnt_fs_get_freq(self->fs))
PySys_WriteStdout("freq: %d\n", mnt_fs_get_freq(self->fs));
if (mnt_fs_get_passno(self->fs))
PySys_WriteStdout("pass: %d\n", mnt_fs_get_passno(self->fs));
if (mnt_fs_get_id(self->fs))
PySys_WriteStdout("id: %d\n", mnt_fs_get_id(self->fs));
if (mnt_fs_get_parent_id(self->fs))
PySys_WriteStdout("parent: %d\n", mnt_fs_get_parent_id(self->fs));
if (mnt_fs_get_devno(self->fs))
PySys_WriteStdout("devno: %d:%d\n", major(mnt_fs_get_devno(self->fs)),
minor(mnt_fs_get_devno(self->fs)));
if (mnt_fs_get_tid(self->fs))
PySys_WriteStdout("tid: %d\n", mnt_fs_get_tid(self->fs));
if (mnt_fs_get_comment(self->fs))
_dump_debug_string("comment: ", mnt_fs_get_comment(self->fs), '\'');
return UL_IncRef(self);
}
/*
** Fs getters/setters
*/
static PyObject *Fs_get_comment(FsObject *self, void *closure __attribute__((unused)))
{
return PyObjectResultStr(mnt_fs_get_comment(self->fs));
}
static int Fs_set_comment(FsObject *self, PyObject *value, void *closure __attribute__((unused)))
{
char *comment = NULL;
int rc = 0;
if (!value) {
PyErr_SetString(PyExc_TypeError, NODEL_ATTR);
return -1;
}
if (!(comment = pystos(value)))
return -1;
rc = mnt_fs_set_comment(self->fs, comment);
if (rc) {
UL_RaiseExc(-rc);
return -1;
}
return 0;
}
/* source */
static PyObject *Fs_get_source(FsObject *self)
{
return PyObjectResultStr(mnt_fs_get_source(self->fs));
}
static int Fs_set_source(FsObject *self, PyObject *value, void *closure __attribute__((unused)))
{
char *source = NULL;
int rc = 0;
if (!value) {
PyErr_SetString(PyExc_TypeError, NODEL_ATTR);
return -1;
}
if (!(source = pystos(value)))
return -1;
rc = mnt_fs_set_source(self->fs, source);
if (rc) {
UL_RaiseExc(-rc);
return -1;
}
return 0;
}
static PyObject *Fs_get_srcpath(FsObject *self)
{
return PyObjectResultStr(mnt_fs_get_srcpath(self->fs));
}
static PyObject *Fs_get_root(FsObject *self)
{
return PyObjectResultStr(mnt_fs_get_root(self->fs));
}
static int Fs_set_root(FsObject *self, PyObject *value, void *closure __attribute__((unused)))
{
char *root = NULL;
int rc = 0;
if (!value) {
PyErr_SetString(PyExc_TypeError, NODEL_ATTR);
return -1;
}
if (!(root = pystos(value)))
return -1;
rc = mnt_fs_set_root(self->fs, root);
if (rc) {
UL_RaiseExc(-rc);
return -1;
}
return 0;
}
static PyObject *Fs_get_target(FsObject *self)
{
return PyObjectResultStr(mnt_fs_get_target(self->fs));
}
static int Fs_set_target(FsObject *self, PyObject *value, void *closure __attribute__((unused)))
{
char *target = NULL;
int rc = 0;
if (!value) {
PyErr_SetString(PyExc_TypeError, NODEL_ATTR);
return -1;
}
if (!(target = pystos(value)))
return -1;
rc = mnt_fs_set_target(self->fs, target);
if (rc) {
UL_RaiseExc(-rc);
return -1;
}
return 0;
}
static PyObject *Fs_get_fstype(FsObject *self)
{
return PyObjectResultStr(mnt_fs_get_fstype(self->fs));
}
static int Fs_set_fstype(FsObject *self, PyObject *value,
void *closure __attribute__((unused)))
{
char *fstype = NULL;
int rc = 0;
if (!value) {
PyErr_SetString(PyExc_TypeError, NODEL_ATTR);
return -1;
}
if (!(fstype = pystos(value)))
return -1;
rc = mnt_fs_set_fstype(self->fs, fstype);
if (rc) {
UL_RaiseExc(-rc);
return -1;
}
return 0;
}
static PyObject *Fs_get_options(FsObject *self)
{
return PyObjectResultStr(mnt_fs_get_options(self->fs));
}
static int Fs_set_options(FsObject *self, PyObject *value,
void *closure __attribute__((unused)))
{
char *options = NULL;
int rc = 0;
if (!value) {
PyErr_SetString(PyExc_TypeError, NODEL_ATTR);
return -1;
}
if (!(options = pystos(value)))
return -1;
rc = mnt_fs_set_options(self->fs, options);
if (rc) {
UL_RaiseExc(-rc);
return -1;
}
return 0;
}
static PyObject *Fs_get_vfs_options(FsObject *self)
{
return PyObjectResultStr(mnt_fs_get_vfs_options(self->fs));
}
static PyObject *Fs_get_optional_fields(FsObject *self)
{
return PyObjectResultStr(mnt_fs_get_optional_fields(self->fs));
}
static PyObject *Fs_get_fs_options(FsObject *self)
{
return PyObjectResultStr(mnt_fs_get_fs_options(self->fs));
}
static PyObject *Fs_get_user_options(FsObject *self)
{
return PyObjectResultStr(mnt_fs_get_user_options(self->fs));
}
static PyObject *Fs_get_attributes(FsObject *self)
{
return PyObjectResultStr(mnt_fs_get_attributes(self->fs));
}
static int Fs_set_attributes(FsObject *self, PyObject *value,
void *closure __attribute__((unused)))
{
char *attributes = NULL;
int rc = 0;
if (!value) {
PyErr_SetString(PyExc_TypeError, NODEL_ATTR);
return -1;
}
if (!(attributes = pystos(value)))
return -1;
rc = mnt_fs_set_attributes(self->fs, attributes);
if (rc) {
UL_RaiseExc(-rc);
return -1;
}
return 0;
}
static PyObject *Fs_get_freq(FsObject *self, void *closure __attribute__((unused)))
{
return PyObjectResultInt(mnt_fs_get_freq(self->fs));
}
static int Fs_set_freq(FsObject *self, PyObject *value,
void *closure __attribute__((unused)))
{
int freq = 0;
if (!value) {
PyErr_SetString(PyExc_TypeError, NODEL_ATTR);
return -1;
}
if (!PyLong_Check(value)) {
PyErr_SetString(PyExc_TypeError, ARG_ERR);
return -1;
}
freq = PyLong_AsLong(value);
if (freq == -1 && PyErr_Occurred()) {
PyErr_SetString(PyExc_RuntimeError, "type conversion failed");
return -1;
}
return mnt_fs_set_freq(self->fs, freq);
}
static PyObject *Fs_get_passno(FsObject *self)
{
return PyObjectResultInt(mnt_fs_get_passno(self->fs));
}
static int Fs_set_passno(FsObject *self, PyObject *value, void *closure __attribute__((unused)))
{
int passno = 0;
if (!value) {
PyErr_SetString(PyExc_TypeError, NODEL_ATTR);
return -1;
}
if (!PyLong_Check(value)) {
PyErr_SetString(PyExc_TypeError, ARG_ERR);
return -1;
}
passno = PyLong_AsLong(value);
if (passno == -1 && PyErr_Occurred()) {
PyErr_SetString(PyExc_RuntimeError, "type conversion failed");
return -1;
}
return mnt_fs_set_passno(self->fs, passno);
}
static PyObject *Fs_get_swaptype(FsObject *self)
{
return PyObjectResultStr(mnt_fs_get_swaptype(self->fs));
}
static PyObject *Fs_get_size(FsObject *self)
{
return PyObjectResultInt(mnt_fs_get_size(self->fs));
}
static PyObject *Fs_get_usedsize(FsObject *self)
{
return PyObjectResultInt(mnt_fs_get_usedsize(self->fs));
}
static PyObject *Fs_get_priority(FsObject *self)
{
return PyObjectResultInt(mnt_fs_get_priority(self->fs));
}
#define Fs_get_propagation_HELP "get_propagation(flags)\n\n\
Note that this function set flags to zero if not found any propagation flag\n\
in mountinfo file. The kernel default is MS_PRIVATE, this flag is not stored\n\
in the mountinfo file.\n\
\n\
Returns self or raises an exception in case of an error."
static PyObject *Fs_get_propagation(FsObject *self)
{
unsigned long flags;
int rc;
rc = mnt_fs_get_propagation(self->fs, &flags);
return rc ? UL_RaiseExc(-rc) : PyObjectResultInt(flags);
}
static PyObject *Fs_get_tid(FsObject *self)
{
return PyObjectResultInt(mnt_fs_get_tid(self->fs));
}
#define Fs_is_kernel_HELP "is_kernel()\n\nReturns 1 if the filesystem " \
"description is read from kernel e.g. /proc/mounts."
static PyObject *Fs_is_kernel(FsObject *self)
{
return PyBool_FromLong(mnt_fs_is_kernel(self->fs));
}
#define Fs_is_netfs_HELP "is_netfs()\n\nReturns 1 if the filesystem is " \
"a network filesystem"
static PyObject *Fs_is_netfs(FsObject *self)
{
return PyBool_FromLong(mnt_fs_is_netfs(self->fs));
}
#define Fs_is_pseudofs_HELP "is_pseudofs()\n\nReturns 1 if the filesystem is "\
"a pseudo fs type (proc, cgroups)"
static PyObject *Fs_is_pseudofs(FsObject *self)
{
return PyBool_FromLong(mnt_fs_is_pseudofs(self->fs));
}
#define Fs_is_swaparea_HELP "is_swaparea()\n\nReturns 1 if the filesystem " \
"uses \"swap\" as a type"
static PyObject *Fs_is_swaparea(FsObject *self)
{
return PyBool_FromLong(mnt_fs_is_swaparea(self->fs));
}
#define Fs_append_attributes_HELP "append_attributes(optstr)\n\n" \
"Appends mount attributes."
static PyObject *Fs_append_attributes(FsObject *self, PyObject *args, PyObject *kwds)
{
char *kwlist[] = {"optstr", NULL};
char *optstr = NULL;
int rc;
if (!PyArg_ParseTupleAndKeywords(args, kwds, "s", kwlist, &optstr)) {
PyErr_SetString(PyExc_TypeError, ARG_ERR);
return NULL;
}
rc = mnt_fs_append_attributes(self->fs, optstr);
return rc ? UL_RaiseExc(-rc) : UL_IncRef(self);
}
#define Fs_append_options_HELP "append_options(optstr)\n\n" \
"Parses (splits) optstr and appends results to VFS, " \
"FS and userspace lists of options."
static PyObject *Fs_append_options(FsObject *self, PyObject *args, PyObject *kwds)
{
char *kwlist[] = {"optstr", NULL};
char *optstr = NULL;
int rc;
if (!PyArg_ParseTupleAndKeywords(args, kwds, "s", kwlist, &optstr)) {
PyErr_SetString(PyExc_TypeError, ARG_ERR);
return NULL;
}
rc = mnt_fs_append_options(self->fs, optstr);
return rc ? UL_RaiseExc(-rc) : UL_IncRef(self);
}
#define Fs_prepend_attributes_HELP "prepend_attributes(optstr)\n\n" \
"Prepends mount attributes."
static PyObject *Fs_prepend_attributes(FsObject *self, PyObject *args, PyObject *kwds)
{
char *kwlist[] = {"optstr", NULL};
char *optstr = NULL;
int rc;
if (!PyArg_ParseTupleAndKeywords(args, kwds, "s", kwlist, &optstr)) {
PyErr_SetString(PyExc_TypeError, ARG_ERR);
return NULL;
}
rc = mnt_fs_prepend_attributes(self->fs, optstr);
return rc ? UL_RaiseExc(-rc) : UL_IncRef(self);
}
#define Fs_prepend_options_HELP "prepend_options(optstr)\n\n" \
"Parses (splits) optstr and prepends results to VFS, " \
"FS and userspace lists of options."
static PyObject *Fs_prepend_options(FsObject *self, PyObject *args, PyObject *kwds)
{
char *kwlist[] = {"optstr", NULL};
char *optstr = NULL;
int rc;
if (!PyArg_ParseTupleAndKeywords(args, kwds, "s", kwlist, &optstr)) {
PyErr_SetString(PyExc_TypeError, ARG_ERR);
return NULL;
}
rc = mnt_fs_prepend_options(self->fs, optstr);
return rc ? UL_RaiseExc(-rc) : UL_IncRef(self);
}
#define Fs_match_fstype_HELP "match_fstype(pattern)\n\n" \
"pattern: filesystem name or comma delimited list(string) of names\n\n" \
"The pattern list of filesystem can be prefixed with a global\n" \
"\"no\" prefix to invert matching of the whole list. The \"no\" could\n" \
"also be used for individual items in the pattern list. So,\n" \
"\"nofoo,bar\" has the same meaning as \"nofoo,nobar\".\n" \
"\"bar\" : \"nofoo,bar\" -> False (global \"no\" prefix)\n" \
"\"bar\" : \"foo,bar\" -> True\n" \
"\"bar\" : \"foo,nobar\" -> False\n\n" \
"Returns True if type is matching, else False."
static PyObject *Fs_match_fstype(FsObject *self, PyObject *args, PyObject *kwds)
{
char *kwlist[] = {"pattern", NULL};
char *pattern = NULL;
if (!PyArg_ParseTupleAndKeywords(args, kwds, "s", kwlist, &pattern)) {
PyErr_SetString(PyExc_TypeError, ARG_ERR);
return NULL;
}
return PyBool_FromLong(mnt_fs_match_fstype(self->fs, pattern));
}
#define Fs_match_options_HELP "match_options(options)\n\n" \
"options: comma delimited list of options (and nooptions)\n" \
"Returns True if fs type is matching to options else False."
static PyObject *Fs_match_options(FsObject *self, PyObject *args, PyObject *kwds)
{
char *kwlist[] = {"options", NULL};
char *options = NULL;
if (!PyArg_ParseTupleAndKeywords(args, kwds, "s", kwlist, &options)) {
PyErr_SetString(PyExc_TypeError, ARG_ERR);
return NULL;
}
return PyBool_FromLong(mnt_fs_match_options(self->fs, options));
}
#define Fs_streq_srcpath_HELP "streq_srcpath(srcpath)\n\n" \
"Compares fs source path with path. The trailing slash is ignored.\n" \
"Returns True if fs source path equal to path, otherwise False."
static PyObject *Fs_streq_srcpath(FsObject *self, PyObject *args, PyObject *kwds)
{
char *kwlist[] = {"srcpath", NULL};
char *srcpath = NULL;
if (!PyArg_ParseTupleAndKeywords(args, kwds, "s", kwlist, &srcpath)) {
PyErr_SetString(PyExc_TypeError, ARG_ERR);
return NULL;
}
return PyBool_FromLong(mnt_fs_streq_srcpath(self->fs, srcpath));
}
#define Fs_streq_target_HELP "streq_target(target)\n\n" \
"Compares fs target path with path. The trailing slash is ignored.\n" \
"See also Fs.match_target().\n" \
"Returns True if fs target path equal to path, otherwise False."
static PyObject *Fs_streq_target(FsObject *self, PyObject *args, PyObject *kwds)
{
char *kwlist[] = {"target", NULL};
char *target = NULL;
if (!PyArg_ParseTupleAndKeywords(args, kwds, "s", kwlist, &target)) {
PyErr_SetString(PyExc_TypeError, ARG_ERR);
return NULL;
}
return PyBool_FromLong(mnt_fs_streq_target(self->fs, target));
}
#define Fs_copy_fs_HELP "copy_fs(dest=None)\n\n" \
"If dest is None, a new object is created, if any fs " \
"field is already set, then the field is NOT overwritten."
static PyObject *Fs_copy_fs(FsObject *self, PyObject *args, PyObject *kwds);
static PyMethodDef Fs_methods[] = {
{"get_propagation", (PyCFunction)Fs_get_propagation, METH_NOARGS, Fs_get_propagation_HELP},
{"mnt_fs_append_attributes", (PyCFunction)Fs_append_attributes, METH_VARARGS|METH_KEYWORDS, Fs_append_attributes_HELP},
{"append_options", (PyCFunction)Fs_append_options, METH_VARARGS|METH_KEYWORDS, Fs_append_options_HELP},
{"mnt_fs_prepend_attributes", (PyCFunction)Fs_prepend_attributes, METH_VARARGS|METH_KEYWORDS, Fs_prepend_attributes_HELP},
{"prepend_options", (PyCFunction)Fs_prepend_options, METH_VARARGS|METH_KEYWORDS, Fs_prepend_options_HELP},
{"copy_fs", (PyCFunction)Fs_copy_fs, METH_VARARGS|METH_KEYWORDS, Fs_copy_fs_HELP},
{"is_kernel", (PyCFunction)Fs_is_kernel, METH_NOARGS, Fs_is_kernel_HELP},
{"is_netfs", (PyCFunction)Fs_is_netfs, METH_NOARGS, Fs_is_netfs_HELP},
{"is_pseudofs", (PyCFunction)Fs_is_pseudofs, METH_NOARGS, Fs_is_pseudofs_HELP},
{"is_swaparea", (PyCFunction)Fs_is_swaparea, METH_NOARGS, Fs_is_swaparea_HELP},
{"match_fstype", (PyCFunction)Fs_match_fstype, METH_VARARGS|METH_KEYWORDS, Fs_match_fstype_HELP},
{"match_options", (PyCFunction)Fs_match_options, METH_VARARGS|METH_KEYWORDS, Fs_match_options_HELP},
{"streq_srcpath", (PyCFunction)Fs_streq_srcpath, METH_VARARGS|METH_KEYWORDS, Fs_streq_srcpath_HELP},
{"streq_target", (PyCFunction)Fs_streq_target, METH_VARARGS|METH_KEYWORDS, Fs_streq_target_HELP},
{"print_debug", (PyCFunction)Fs_print_debug, METH_NOARGS, Fs_print_debug_HELP},
{NULL}
};
static void Fs_destructor(FsObject *self)
{
DBG(FS, pymnt_debug_h(self->fs, "destructor py-obj: %p, py-refcnt=%d",
self, (int) ((PyObject *) self)->ob_refcnt));
mnt_unref_fs(self->fs);
PyFree(self);
}
static PyObject *Fs_new(PyTypeObject *type, PyObject *args __attribute__((unused)),
PyObject *kwds __attribute__((unused)))
{
FsObject *self = (FsObject*)type->tp_alloc(type, 0);
if (self) {
self->fs = NULL;
DBG(FS, pymnt_debug_h(self, "new"));
}
return (PyObject *) self;
}
static int Fs_init(FsObject *self, PyObject *args, PyObject *kwds)
{
char *source = NULL, *root = NULL, *target = NULL;
char *fstype = NULL, *options = NULL, *attributes =NULL;
int freq = 0; int passno = 0;
int rc = 0;
char *kwlist[] = {
"source", "root", "target",
"fstype", "options", "attributes",
"freq", "passno", NULL
};
if (!PyArg_ParseTupleAndKeywords(args, kwds, "|ssssssii", kwlist,
&source, &root, &target, &fstype, &options,
&attributes, &freq, &passno)) {
PyErr_SetString(PyExc_TypeError, "Invalid type");
return -1;
}
DBG(FS, pymnt_debug_h(self, "init"));
if (self->fs)
mnt_unref_fs(self->fs);
self->fs = mnt_new_fs(); /* new FS with refcount=1 */
if (source && (rc = mnt_fs_set_source(self->fs, source))) {
PyErr_SetString(PyExc_MemoryError, MEMORY_ERR);
return rc;
}
if (root && (rc = mnt_fs_set_root(self->fs, root))) {
PyErr_SetString(PyExc_MemoryError, MEMORY_ERR);
return rc;
}
if (target && (rc = mnt_fs_set_target(self->fs, target))) {
PyErr_SetString(PyExc_MemoryError, MEMORY_ERR);
return rc;
}
if (fstype && (rc = mnt_fs_set_fstype(self->fs, fstype))) {
PyErr_SetString(PyExc_MemoryError, MEMORY_ERR);
return rc;
}
if (options && (rc = mnt_fs_set_options(self->fs, options))) {
PyErr_SetString(PyExc_MemoryError, MEMORY_ERR);
return rc;
}
if (attributes && (rc = mnt_fs_set_attributes(self->fs, attributes))) {
PyErr_SetString(PyExc_MemoryError, MEMORY_ERR);
return rc;
}
mnt_fs_set_freq(self->fs, freq);
mnt_fs_set_passno(self->fs, passno);
mnt_fs_set_userdata(self->fs, self); /* store a pointer to self, convenient when resetting the table */
return 0;
}
/*
* missing:
* attribute
* option
*/
static PyGetSetDef Fs_getseters[] = {
{"id", (getter)Fs_get_id, NULL, "mountinfo[1]: ID", NULL},
{"parent", (getter)Fs_get_parent_id, NULL, "mountinfo[2]: parent", NULL},
{"devno", (getter)Fs_get_devno, NULL, "mountinfo[3]: st_dev", NULL},
{"comment", (getter)Fs_get_comment, (setter)Fs_set_comment, "fstab entry comment", NULL},
{"source", (getter)Fs_get_source, (setter)Fs_set_source, "fstab[1], mountinfo[10], swaps[1]: source dev, file, dir or TAG", NULL},
{"srcpath", (getter)Fs_get_srcpath, NULL, "mount source path or NULL in case of error or when the path is not defined.", NULL},
{"root", (getter)Fs_get_root, (setter)Fs_set_root, "mountinfo[4]: root of the mount within the FS", NULL},
{"target", (getter)Fs_get_target, (setter)Fs_set_target, "mountinfo[5], fstab[2]: mountpoint", NULL},
{"fstype", (getter)Fs_get_fstype, (setter)Fs_set_fstype, "mountinfo[9], fstab[3]: filesystem type", NULL},
{"options", (getter)Fs_get_options, (setter)Fs_set_options, "fstab[4]: merged options", NULL},
{"vfs_options", (getter)Fs_get_vfs_options, NULL, "mountinfo[6]: fs-independent (VFS) options", NULL},
{"opt_fields", (getter)Fs_get_optional_fields, NULL, "mountinfo[7]: optional fields", NULL},
{"fs_options", (getter)Fs_get_fs_options, NULL, "mountinfo[11]: fs-dependent options", NULL},
{"usr_options", (getter)Fs_get_user_options, NULL, "userspace mount options", NULL},
{"attributes", (getter)Fs_get_attributes, (setter)Fs_set_attributes, "mount attributes", NULL},
{"freq", (getter)Fs_get_freq, (setter)Fs_set_freq, "fstab[5]: dump frequency in days", NULL},
{"passno", (getter)Fs_get_passno, (setter)Fs_set_passno, "fstab[6]: pass number on parallel fsck", NULL},
{"swaptype", (getter)Fs_get_swaptype, NULL, "swaps[2]: device type", NULL},
{"size", (getter)Fs_get_size, NULL, "saps[3]: swaparea size", NULL},
{"usedsize", (getter)Fs_get_usedsize, NULL, "swaps[4]: used size", NULL},
{"priority", (getter)Fs_get_priority, NULL, "swaps[5]: swap priority", NULL},
{"tag", (getter)Fs_get_tag, NULL, "(Name, Value)", NULL},
{"tid", (getter)Fs_get_tid, NULL, "/proc/<tid>/mountinfo, otherwise zero", NULL},
{NULL}
};
static PyObject *Fs_repr(FsObject *self)
{
const char *src = mnt_fs_get_source(self->fs),
*tgt = mnt_fs_get_target(self->fs),
*type = mnt_fs_get_fstype(self->fs);
return PyUnicode_FromFormat(
"<libmount.Fs object at %p, "
"source=%s, target=%s, fstype=%s>",
self,
src ? src : "None",
tgt ? tgt : "None",
type ? type : "None");
}
PyObject *PyObjectResultFs(struct libmnt_fs *fs)
{
FsObject *result;
if (!fs) {
PyErr_SetString(LibmountError, "internal exception");
return NULL;
}
result = mnt_fs_get_userdata(fs);
if (result) {
Py_INCREF(result);
DBG(FS, pymnt_debug_h(fs, "result py-obj %p: already exists, py-refcnt=%d",
result, (int) ((PyObject *) result)->ob_refcnt));
return (PyObject *) result;
}
/* Creating an encapsulating object: increment the refcount, so that code
* such as tab.next_fs() doesn't call the destructor, which would free
* our fs struct as well
*/
result = PyObject_New(FsObject, &FsType);
if (!result) {
UL_RaiseExc(ENOMEM);
return NULL;
}
Py_INCREF(result);
mnt_ref_fs(fs);
DBG(FS, pymnt_debug_h(fs, "result py-obj %p new, py-refcnt=%d",
result, (int) ((PyObject *) result)->ob_refcnt));
result->fs = fs;
mnt_fs_set_userdata(fs, result);
return (PyObject *) result;
}
static PyObject *Fs_copy_fs(FsObject *self, PyObject *args, PyObject *kwds)
{
PyObject *dest = NULL;
char *kwlist[] = {"dest", NULL};
if (!PyArg_ParseTupleAndKeywords(args, kwds, "|O", kwlist, &dest)) {
PyErr_SetString(PyExc_TypeError, ARG_ERR);
return NULL;
}
if (PyObject_TypeCheck(dest, &FsType)) { /* existing object passed as argument */
if (!mnt_copy_fs(((FsObject *)dest)->fs, self->fs))
return NULL;
DBG(FS, pymnt_debug_h(dest, "copy data"));
return (PyObject *)dest;
}
if (dest == Py_None) { /* create new object */
FsObject *result = PyObject_New(FsObject, &FsType);
DBG(FS, pymnt_debug_h(result, "new copy"));
result->fs = mnt_copy_fs(NULL, self->fs);
mnt_fs_set_userdata(result->fs, result); /* keep a pointer to encapsulating object */
return (PyObject *)result;
}
PyErr_SetString(PyExc_TypeError, ARG_ERR);
return NULL;
}
PyTypeObject FsType = {
PyVarObject_HEAD_INIT(NULL, 0)
.tp_name = "libmount.Fs",
.tp_basicsize = sizeof(FsObject),
.tp_dealloc = (destructor)Fs_destructor,
.tp_repr = (reprfunc)Fs_repr,
.tp_flags = Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE,
.tp_doc = Fs_HELP,
.tp_methods = Fs_methods,
.tp_members = Fs_members,
.tp_getset = Fs_getseters,
.tp_init = (initproc)Fs_init,
.tp_new = Fs_new,
};
void FS_AddModuleObject(PyObject *mod)
{
if (PyType_Ready(&FsType) < 0)
return;
DBG(FS, pymnt_debug("add to module"));
Py_INCREF(&FsType);
PyModule_AddObject(mod, "Fs", (PyObject *)&FsType);
}

View file

@ -0,0 +1,44 @@
if get_option('build-python').disabled()
subdir_done()
endif
pylibmount_sources = '''
pylibmount.c
pylibmount.h
fs.c
tab.c
'''.split()
if LINUX
pylibmount_sources += 'context.c'
endif
python_module = import('python')
python = python_module.find_installation(
get_option('python'),
required : true,
disabler : true)
if meson.version().version_compare('<1.4.1')
cc.has_header('Python.h',
include_directories : include_directories(python.get_path('include')),
required : true)
endif
python.extension_module(
'pylibmount',
pylibmount_sources,
include_directories : [dir_include],
subdir : 'libmount',
dependencies : [mount_dep, python.dependency(embed: true)],
c_args : [
'-Wno-cast-function-type',
# https://github.com/util-linux/util-linux/issues/2366
python.language_version().version_compare('>=3.12') ?
[ '-Wno-error=redundant-decls' ] : [],
],
install : true)
python.install_sources(
'__init__.py',
subdir : 'libmount',
pure : false)

View file

@ -0,0 +1,326 @@
/*
* Python bindings for the libmount library.
*
* Copyright (C) 2013, Red Hat, Inc. All rights reserved.
* Written by Ondrej Oprala and Karel Zak
*
* This file is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 3 of the License, or (at your option) any later version.
*
* This file is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this file; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#include "pylibmount.h"
/* Libmount-specific Exception class */
PyObject *LibmountError;
int pylibmount_debug_mask;
PyObject *UL_IncRef(void *killme)
{
Py_INCREF(killme);
return killme;
}
void PyFree(void *o)
{
#if PY_MAJOR_VERSION >= 3
Py_TYPE(o)->tp_free((PyObject *)o);
#else
((PyObject *)o)->ob_type->tp_free((PyObject *)o);
#endif
}
/* Demultiplexer for various possible error conditions across the libmount library */
void *UL_RaiseExc(int e)
{
/* TODO: Do we need to deal with -1/1? */
switch (e) {
case ENOMEM:
PyErr_SetString(PyExc_MemoryError, strerror(e));
break;
case EINVAL:
PyErr_SetString(PyExc_TypeError, strerror(e));
break;
/* libmount-specific errors */
case MNT_ERR_NOFSTAB:
PyErr_SetString(LibmountError, "Not found required entry in fstab");
break;
case MNT_ERR_NOFSTYPE:
PyErr_SetString(LibmountError, "Failed to detect filesystem type");
break;
case MNT_ERR_NOSOURCE:
PyErr_SetString(LibmountError, "Required mount source undefined");
break;
case MNT_ERR_LOOPDEV:
PyErr_SetString(LibmountError, "Loopdev setup failed");
break;
case MNT_ERR_APPLYFLAGS:
PyErr_SetString(LibmountError, "Failed to parse/use userspace mount options");
break;
case MNT_ERR_MOUNTOPT:
PyErr_SetString(LibmountError, "Failed to apply propagation flags");
break;
case MNT_ERR_AMBIFS:
PyErr_SetString(LibmountError, "Libblkid detected more filesystems on the device");
break;
case MNT_ERR_LOOPOVERLAP:
PyErr_SetString(LibmountError, "Detected overlapping loop device that cannot be re-use");
break;
case MNT_ERR_LOCK:
PyErr_SetString(LibmountError, "Failed to lock mtab/utab or so");
break;
case MNT_ERR_NAMESPACE:
PyErr_SetString(LibmountError, "Failed to switch namespace");
break;
/* some other errno */
default:
PyErr_SetString(PyExc_Exception, strerror(e));
break;
}
return NULL;
}
/*
* General functions
*/
PyObject *PyObjectResultInt(int i)
{
PyObject *result = Py_BuildValue("i", i);
if (!result)
PyErr_SetString(PyExc_RuntimeError, CONSTRUCT_ERR);
return result;
}
PyObject *PyObjectResultStr(const char *s)
{
PyObject *result;
if (!s)
/* TODO: maybe lie about it and return "":
* which would allow for
* fs = libmount.Fs()
* fs.comment += "comment"
return Py_BuildValue("s", ""); */
Py_RETURN_NONE;
result = Py_BuildValue("s", s);
if (!result)
PyErr_SetString(PyExc_RuntimeError, CONSTRUCT_ERR);
return result;
}
/* wrapper around a common use case for PyUnicode_AsASCIIString() */
char *pystos(PyObject *pys)
{
#if PY_MAJOR_VERSION >= 3
if (!PyUnicode_Check(pys)) {
PyErr_SetString(PyExc_TypeError, ARG_ERR);
return NULL;
}
return (char *)PyUnicode_1BYTE_DATA(pys);
#else
if (!PyString_Check(pys)) {
PyErr_SetString(PyExc_TypeError, ARG_ERR);
return NULL;
}
return PyString_AsString(pys);
#endif
}
/*
* the libmount module
*/
#define PYLIBMOUNT_DESC \
"Python API for the util-linux libmount library.\n\n" \
"Please note that none of the classes' attributes may be deleted.\n" \
"This is not a complete mapping to the libmount C API, nor is it\n" \
"attempting to be one.\n" "Iterator functions only allow forward\n" \
"iteration for now. Context.get_{user_,}mflags() differs from the C API\n" \
"and returns the flags directly. Fs.get_tag() differs from the C API\n" \
"and returns a (tag, value) tuple. Every attribute is \"filtered\"" \
"through appropriate getters/setters, no values are set directly."
struct module_state {
PyObject *error;
};
#if PY_MAJOR_VERSION >= 3
#define GETSTATE(m) ((struct module_state*)PyModule_GetState(m))
#else
#define GETSTATE(m) (&_state)
static struct module_state _state;
#endif
static PyObject *
error_out(PyObject *m __attribute__((unused))) {
struct module_state *st = GETSTATE(m);
PyErr_SetString(st->error, "something bad happened");
return NULL;
}
static PyMethodDef pylibmount_methods[] = {
{"error_out", (PyCFunction)error_out, METH_NOARGS, NULL},
{NULL, NULL}
};
#if PY_MAJOR_VERSION >= 3
static int pylibmount_traverse(PyObject *m, visitproc visit, void *arg) {
Py_VISIT(GETSTATE(m)->error);
return 0;
}
static int pylibmount_clear(PyObject *m) {
Py_CLEAR(GETSTATE(m)->error);
return 0;
}
static struct PyModuleDef moduledef = {
PyModuleDef_HEAD_INIT,
"pylibmount",
NULL,
sizeof(struct module_state),
pylibmount_methods,
NULL,
pylibmount_traverse,
pylibmount_clear,
NULL
};
#define INITERROR return NULL
PyMODINIT_FUNC PyInit_pylibmount(void);
PyMODINIT_FUNC PyInit_pylibmount(void)
#else
#define INITERROR return
# ifndef PyMODINIT_FUNC
# define PyMODINIT_FUNC void
# endif
PyMODINIT_FUNC initpylibmount(void);
PyMODINIT_FUNC initpylibmount(void)
#endif
{
#if PY_MAJOR_VERSION >= 3
PyObject *m = PyModule_Create(&moduledef);
#else
PyObject *m = Py_InitModule3("pylibmount", pylibmount_methods, PYLIBMOUNT_DESC);
#endif
if (!m)
INITERROR;
/*
* init debug stuff
*/
if (!(pylibmount_debug_mask & PYMNT_DEBUG_INIT)) {
char *str = getenv("PYLIBMOUNT_DEBUG");
errno = 0;
pylibmount_debug_mask = 0;
if (str)
pylibmount_debug_mask = strtoul(str, NULL, 0);
if (errno)
pylibmount_debug_mask = 0;
pylibmount_debug_mask |= PYMNT_DEBUG_INIT;
}
if (pylibmount_debug_mask && pylibmount_debug_mask != PYMNT_DEBUG_INIT)
DBG(INIT, pymnt_debug("library debug mask: 0x%04x",
pylibmount_debug_mask));
mnt_init_debug(0);
/*
* Add module objects
*/
LibmountError = PyErr_NewException("libmount.Error", NULL, NULL);
Py_INCREF(LibmountError);
PyModule_AddObject(m, "Error", (PyObject *)LibmountError);
FS_AddModuleObject(m);
Table_AddModuleObject(m);
#ifdef __linux__
Context_AddModuleObject(m);
#endif
/*
* mount(8) userspace options masks (MNT_MAP_USERSPACE map)
*/
PyModule_AddIntConstant(m, "MNT_MS_COMMENT", MNT_MS_COMMENT);
PyModule_AddIntConstant(m, "MNT_MS_GROUP", MNT_MS_GROUP);
PyModule_AddIntConstant(m, "MNT_MS_HELPER", MNT_MS_HELPER);
PyModule_AddIntConstant(m, "MNT_MS_LOOP", MNT_MS_LOOP);
PyModule_AddIntConstant(m, "MNT_MS_NETDEV", MNT_MS_NETDEV);
PyModule_AddIntConstant(m, "MNT_MS_NOAUTO", MNT_MS_NOAUTO);
PyModule_AddIntConstant(m, "MNT_MS_NOFAIL", MNT_MS_NOFAIL);
PyModule_AddIntConstant(m, "MNT_MS_OFFSET", MNT_MS_OFFSET);
PyModule_AddIntConstant(m, "MNT_MS_OWNER", MNT_MS_OWNER);
PyModule_AddIntConstant(m, "MNT_MS_SIZELIMIT", MNT_MS_SIZELIMIT);
PyModule_AddIntConstant(m, "MNT_MS_ENCRYPTION", MNT_MS_ENCRYPTION);
PyModule_AddIntConstant(m, "MNT_MS_UHELPER", MNT_MS_UHELPER);
PyModule_AddIntConstant(m, "MNT_MS_USER", MNT_MS_USER);
PyModule_AddIntConstant(m, "MNT_MS_USERS", MNT_MS_USERS);
PyModule_AddIntConstant(m, "MNT_MS_XCOMMENT", MNT_MS_XCOMMENT);
PyModule_AddIntConstant(m, "MNT_MS_HASH_DEVICE", MNT_MS_HASH_DEVICE);
PyModule_AddIntConstant(m, "MNT_MS_ROOT_HASH", MNT_MS_ROOT_HASH);
PyModule_AddIntConstant(m, "MNT_MS_HASH_OFFSET", MNT_MS_HASH_OFFSET);
PyModule_AddIntConstant(m, "MNT_MS_ROOT_HASH_FILE", MNT_MS_ROOT_HASH_FILE);
PyModule_AddIntConstant(m, "MNT_MS_FEC_DEVICE", MNT_MS_FEC_DEVICE);
PyModule_AddIntConstant(m, "MNT_MS_FEC_OFFSET", MNT_MS_FEC_OFFSET);
PyModule_AddIntConstant(m, "MNT_MS_FEC_ROOTS", MNT_MS_FEC_ROOTS);
PyModule_AddIntConstant(m, "MNT_MS_ROOT_HASH_SIG", MNT_MS_ROOT_HASH_SIG);
/*
* mount(2) MS_* masks (MNT_MAP_LINUX map)
*/
PyModule_AddIntConstant(m, "MS_BIND", MS_BIND);
PyModule_AddIntConstant(m, "MS_DIRSYNC", MS_DIRSYNC);
PyModule_AddIntConstant(m, "MS_I_VERSION", MS_I_VERSION);
PyModule_AddIntConstant(m, "MS_MANDLOCK", MS_MANDLOCK);
PyModule_AddIntConstant(m, "MS_MGC_MSK", MS_MGC_MSK);
PyModule_AddIntConstant(m, "MS_MGC_VAL", MS_MGC_VAL);
PyModule_AddIntConstant(m, "MS_MOVE", MS_MOVE);
PyModule_AddIntConstant(m, "MS_NOATIME", MS_NOATIME);
PyModule_AddIntConstant(m, "MS_NODEV", MS_NODEV);
PyModule_AddIntConstant(m, "MS_NODIRATIME", MS_NODIRATIME);
PyModule_AddIntConstant(m, "MS_NOEXEC", MS_NOEXEC);
PyModule_AddIntConstant(m, "MS_NOSUID", MS_NOSUID);
PyModule_AddIntConstant(m, "MS_OWNERSECURE", MS_OWNERSECURE);
PyModule_AddIntConstant(m, "MS_PRIVATE", MS_PRIVATE);
PyModule_AddIntConstant(m, "MS_PROPAGATION", MS_PROPAGATION);
PyModule_AddIntConstant(m, "MS_RDONLY", MS_RDONLY);
PyModule_AddIntConstant(m, "MS_REC", MS_REC);
PyModule_AddIntConstant(m, "MS_RELATIME", MS_RELATIME);
PyModule_AddIntConstant(m, "MS_REMOUNT", MS_REMOUNT);
PyModule_AddIntConstant(m, "MS_SECURE", MS_SECURE);
PyModule_AddIntConstant(m, "MS_SHARED", MS_SHARED);
PyModule_AddIntConstant(m, "MS_SILENT", MS_SILENT);
PyModule_AddIntConstant(m, "MS_SLAVE", MS_SLAVE);
PyModule_AddIntConstant(m, "MS_STRICTATIME", MS_STRICTATIME);
PyModule_AddIntConstant(m, "MS_SYNCHRONOUS", MS_SYNCHRONOUS);
PyModule_AddIntConstant(m, "MS_UNBINDABLE", MS_UNBINDABLE);
/* Will we need these directly?
PyModule_AddIntConstant(m, "MNT_ERR_AMBIFS", MNT_ERR_AMBIFS);
PyModule_AddIntConstant(m, "MNT_ERR_APPLYFLAGS", MNT_ERR_APPLYFLAGS);
PyModule_AddIntConstant(m, "MNT_ERR_LOOPDEV", MNT_ERR_LOOPDEV);
PyModule_AddIntConstant(m, "MNT_ERR_MOUNTOPT", MNT_ERR_MOUNTOPT);
PyModule_AddIntConstant(m, "MNT_ERR_NOFSTAB", MNT_ERR_NOFSTAB);
PyModule_AddIntConstant(m, "MNT_ERR_NOFSTYPE", MNT_ERR_NOFSTYPE);
PyModule_AddIntConstant(m, "MNT_ERR_NOSOURCE", MNT_ERR_NOSOURCE);
*/
/* Still useful for functions using iterators internally */
PyModule_AddIntConstant(m, "MNT_ITER_FORWARD", MNT_ITER_FORWARD);
PyModule_AddIntConstant(m, "MNT_ITER_BACKWARD", MNT_ITER_BACKWARD);
#if PY_MAJOR_VERSION >= 3
return m;
#endif
}

View file

@ -0,0 +1,131 @@
#ifndef UTIL_LINUX_PYLIBMOUNT_H
#define UTIL_LINUX_PYLIBMOUNT_H
#include <Python.h>
#include <structmember.h>
#include "c.h"
#include "libmount.h"
#define CONFIG_PYLIBMOUNT_DEBUG
#define PYMNT_DEBUG_INIT (1 << 1)
#define PYMNT_DEBUG_TAB (1 << 2)
#define PYMNT_DEBUG_FS (1 << 3)
#define PYMNT_DEBUG_CXT (1 << 4)
#ifdef CONFIG_PYLIBMOUNT_DEBUG
# include <stdio.h>
# include <stdarg.h>
# define DBG(m, x) do { \
if ((PYMNT_DEBUG_ ## m) & pylibmount_debug_mask) { \
fprintf(stderr, "%d: pylibmount: %6s: ", getpid(), # m); \
x; \
} \
} while (0)
extern int pylibmount_debug_mask;
static inline void __attribute__ ((__format__ (__printf__, 1, 2)))
pymnt_debug(const char *mesg, ...)
{
va_list ap;
va_start(ap, mesg);
vfprintf(stderr, mesg, ap);
va_end(ap);
fputc('\n', stderr);
}
static inline void __attribute__ ((__format__ (__printf__, 2, 3)))
pymnt_debug_h(void *handler, const char *mesg, ...)
{
va_list ap;
if (handler)
fprintf(stderr, "[%p]: ", handler);
va_start(ap, mesg);
vfprintf(stderr, mesg, ap);
va_end(ap);
fputc('\n', stderr);
}
#else /* !CONFIG_PYLIBMOUNT_DEBUG */
# define DBG(m,x) do { ; } while (0)
#endif
#define NODEL_ATTR "This attribute cannot be deleted"
#define CONSTRUCT_ERR "Error during object construction"
#define ARG_ERR "Invalid number or type of arguments"
#define NOFS_ERR "No filesystems to mount"
#define MEMORY_ERR strerror(ENOMEM)
#define CONV_ERR "Type conversion failed"
/*
* fs.c
*/
typedef struct {
PyObject_HEAD
struct libmnt_fs *fs;
} FsObject;
extern PyTypeObject FsType;
extern PyObject *PyObjectResultFs(struct libmnt_fs *fs);
extern void FS_AddModuleObject(PyObject *mod);
/*
* tab.c
*/
typedef struct {
PyObject_HEAD
struct libmnt_table *tab;
struct libmnt_iter *iter;
PyObject *errcb;
} TableObject;
extern PyTypeObject TableType;
extern PyObject *PyObjectResultTab(struct libmnt_table *tab);
extern void Table_unref(struct libmnt_table *tab);
extern void Table_AddModuleObject(PyObject *mod);
extern int pymnt_table_parser_errcb(struct libmnt_table *tb, const char *filename, int line);
#ifdef __linux__
/*
* context.c
*/
typedef struct {
PyObject_HEAD
struct libmnt_context *cxt;
PyObject *table_errcb;
} ContextObjext;
extern PyTypeObject ContextType;
extern void Context_AddModuleObject(PyObject *mod);
#endif /* __linux__ */
/*
* misc
*/
extern PyObject *LibmountError;
extern PyObject *UL_IncRef(void *killme);
extern void *UL_RaiseExc(int e);
extern PyObject *PyObjectResultInt(int i);
extern PyObject *PyObjectResultStr(const char *s);
extern char *pystos(PyObject *pys);
extern void PyFree(void *o);
#endif /* UTIL_LINUX_PYLIBMOUNT */

757
libmount/python/tab.c Normal file
View file

@ -0,0 +1,757 @@
/*
* Python bindings for the libmount library.
*
* Copyright (C) 2013, Red Hat, Inc. All rights reserved.
* Written by Ondrej Oprala and Karel Zak
*
* This file is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 3 of the License, or (at your option) any later version.
*
* This file is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this file; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#include "pylibmount.h"
static PyMemberDef Table_members[] = {
{ NULL }
};
static int Table_set_parser_errcb(TableObject *self, PyObject *func,
void *closure __attribute__((unused)))
{
PyObject *tmp;
if (!func) {
PyErr_SetString(PyExc_TypeError, NODEL_ATTR);
return -1;
}
if (!PyCallable_Check(func))
return -1;
tmp = self->errcb;
Py_INCREF(func);
self->errcb = func;
Py_XDECREF(tmp);
return 0;
}
static PyObject *Table_get_intro_comment(TableObject *self,
void *closure __attribute__((unused)))
{
return PyObjectResultStr(mnt_table_get_intro_comment(self->tab));
}
static int Table_set_intro_comment(TableObject *self, PyObject *value,
void *closure __attribute__((unused)))
{
char *comment = NULL;
int rc = 0;
if (!value) {
PyErr_SetString(PyExc_TypeError, NODEL_ATTR);
return -1;
}
if (!(comment = pystos(value)))
return -1;
if ((rc = mnt_table_set_intro_comment(self->tab, comment))) {
UL_RaiseExc(-rc);
return -1;
}
return 0;
}
static PyObject *Table_get_trailing_comment(TableObject *self,
void *closure __attribute__((unused)))
{
return PyObjectResultStr(mnt_table_get_trailing_comment(self->tab));
}
static int Table_set_trailing_comment(TableObject *self, PyObject *value,
void *closure __attribute__((unused)))
{
char *comment = NULL;
int rc = 0;
if (!value) {
PyErr_SetString(PyExc_TypeError, NODEL_ATTR);
return -1;
}
if (!(comment = pystos(value)))
return -1;
if ((rc = mnt_table_set_trailing_comment(self->tab, comment))) {
UL_RaiseExc(-rc);
return -1;
}
return 0;
}
#define Table_enable_comments_HELP "enable_comments(enable)\n\n" \
"Enables parsing of comments.\n\n" \
"The initial (intro) file comment is accessible by\n" \
"Tab.intro_comment. The intro and the comment of the first fstab" \
"entry has to be separated by blank line. The filesystem comments are\n" \
"accessible by Fs.comment. The trailing fstab comment is accessible\n" \
"by Tab.trailing_comment.\n" \
"\n" \
"<informalexample>\n" \
"<programlisting>\n" \
"#\n" \
"# Intro comment\n" \
"#\n" \
"\n" \
"# this comments belongs to the first fs\n" \
"LABEL=foo /mnt/foo auto defaults 1 2\n" \
"# this comments belongs to the second fs\n" \
"LABEL=bar /mnt/bar auto defaults 1 2 \n" \
"# trailing comment\n" \
"</programlisting>\n" \
"</informalexample>"
static PyObject *Table_enable_comments(TableObject *self, PyObject *args,
PyObject *kwds)
{
int enable = 0;
char *kwlist[] = {"enable", NULL};
if (!PyArg_ParseTupleAndKeywords(args, kwds, "i", kwlist, &enable)) {
PyErr_SetString(PyExc_TypeError, ARG_ERR);
return NULL;
}
mnt_table_enable_comments(self->tab, enable);
Py_INCREF(self);
return (PyObject *)self;
}
#define Table_replace_file_HELP "replace_file(filename)\n\n" \
"This function replaces filename with the new content from TableObject."
static PyObject *Table_replace_file(TableObject *self, PyObject *args, PyObject *kwds)
{
int rc;
char *filename = NULL;
char *kwlist[] = {"filename", NULL};
if (!PyArg_ParseTupleAndKeywords(args, kwds, "s", kwlist, &filename)) {
PyErr_SetString(PyExc_TypeError, ARG_ERR);
return NULL;
}
rc = mnt_table_replace_file(self->tab, filename);
return rc ? UL_RaiseExc(-rc) : UL_IncRef(self);
}
#define Table_write_file_HELP "write_file(path)\n\n" \
"This function writes tab to file(stream)"
static PyObject *Table_write_file(TableObject *self, PyObject *args, PyObject *kwds)
{
int rc;
//PyObject *stream = NULL;
FILE *f = NULL;
char *path = NULL;
char *kwlist[] = {"path", NULL};
if (!PyArg_ParseTupleAndKeywords(args, kwds, "s", kwlist,
&path)) {
PyErr_SetString(PyExc_TypeError, ARG_ERR);
return NULL;
}
if (!(f = fopen(path, "w")))
return UL_RaiseExc(errno);
rc = mnt_table_write_file(self->tab, f);
fclose(f);
return rc ? UL_RaiseExc(-rc) : UL_IncRef(self);
}
#define Table_find_devno_HELP "find_devno(devno, [direction])\n\n" \
"Note that zero could be valid device number for root pseudo " \
"filesystem (e.g. tmpfs)\n" \
"Returns a tab entry or None"
static PyObject *Table_find_devno(TableObject *self, PyObject *args, PyObject *kwds)
{
dev_t devno;
int direction = MNT_ITER_BACKWARD;
char *kwlist[] = {"devno", "direction", NULL};
if (!PyArg_ParseTupleAndKeywords(args, kwds, "I|i", kwlist, &devno, &direction)) {
PyErr_SetString(PyExc_TypeError, ARG_ERR);
return NULL;
}
return PyObjectResultFs(mnt_table_find_devno(self->tab, devno, direction));
}
#define Table_find_mountpoint_HELP "find_mountpoint(path, [direction])\n\n" \
"Returns a tab entry or None."
static PyObject *Table_find_mountpoint(TableObject *self, PyObject *args, PyObject *kwds)
{
char *path;
int direction = MNT_ITER_BACKWARD;
char *kwlist[] = {"path", "direction", NULL};
if (! PyArg_ParseTupleAndKeywords(args, kwds, "s|i", kwlist, &path, &direction)) {
PyErr_SetString(PyExc_TypeError, ARG_ERR);
return NULL;
}
return PyObjectResultFs(mnt_table_find_mountpoint(self->tab, path, direction));
}
#define Table_find_pair_HELP "find_pair(source, target, [direction])\n\n" \
"Returns a tab entry or None."
static PyObject *Table_find_pair(TableObject *self, PyObject *args, PyObject *kwds)
{
char *kwlist[] = {"source", "target", "direction", NULL};
char *source;
char *target;
int direction = MNT_ITER_BACKWARD;
if (!PyArg_ParseTupleAndKeywords(args, kwds, "ss|i", kwlist, &source, &target, &direction)) {
PyErr_SetString(PyExc_TypeError, ARG_ERR);
return NULL;
}
return PyObjectResultFs(mnt_table_find_pair(self->tab, source, target, direction));
}
#define Table_find_source_HELP "find_source(source, [direction])\n\n" \
"Returns a tab entry or None."
static PyObject *Table_find_source(TableObject *self, PyObject *args, PyObject *kwds)
{
char *kwlist[] = {"source", "direction", NULL};
char *source;
int direction = MNT_ITER_BACKWARD;
if (!PyArg_ParseTupleAndKeywords(args, kwds, "s|i", kwlist, &source, &direction)) {
PyErr_SetString(PyExc_TypeError, ARG_ERR);
return NULL;
}
return PyObjectResultFs(mnt_table_find_source(self->tab, source, direction));
}
#define Table_find_target_HELP "find_target(target, [direction])\n\n" \
"Try to lookup an entry in given tab, possible are three iterations, first\n" \
"with path, second with realpath(path) and third with realpath(path)\n" \
"against realpath(fs->target). The 2nd and 3rd iterations are not performed\n" \
"when tb cache is not set (cache not implemented yet).\n" \
"\n" \
"Returns a tab entry or None."
static PyObject *Table_find_target(TableObject *self, PyObject *args, PyObject *kwds)
{
char *kwlist[] = {"target", "direction", NULL};
char *target;
int direction = MNT_ITER_BACKWARD;
if (!PyArg_ParseTupleAndKeywords(args, kwds, "s|i", kwlist, &target, &direction)) {
PyErr_SetString(PyExc_TypeError, ARG_ERR);
return NULL;
}
return PyObjectResultFs(mnt_table_find_target(self->tab, target, direction));
}
#define Table_find_srcpath_HELP "find_srcpath(srcpath, [direction])\n\n" \
"Try to lookup an entry in given tab, possible are four iterations, first\n" \
"with path, second with realpath(path), third with tags (LABEL, UUID, ..)\n" \
"from path and fourth with realpath(path) against realpath(entry->srcpath).\n" \
"\n" \
"The 2nd, 3rd and 4th iterations are not performed when tb cache is not\n" \
"set (not implemented yet).\n" \
"\n" \
"Note that None is a valid source path; it will be replaced with \"none\". The\n" \
"\"none\" is used in /proc/{mounts,self/mountinfo} for pseudo filesystems.\n" \
"\n" \
"Returns a tab entry or None."
static PyObject *Table_find_srcpath(TableObject *self, PyObject *args, PyObject *kwds)
{
char *kwlist[] = {"srcpath", "direction", NULL};
char *srcpath;
int direction = MNT_ITER_BACKWARD;
if (!PyArg_ParseTupleAndKeywords(args, kwds, "s|i", kwlist, &srcpath, &direction)) {
PyErr_SetString(PyExc_TypeError, ARG_ERR);
return NULL;
}
return PyObjectResultFs(mnt_table_find_srcpath(self->tab, srcpath, direction));
}
#define Table_find_tag_HELP "find_tag(tag, val, [direction])\n\n" \
"Try to lookup an entry in given tab, first attempt is lookup by tag and\n" \
"val, for the second attempt the tag is evaluated (converted to the device\n" \
"name) and Tab.find_srcpath() is performed. The second attempt is not\n" \
"performed when tb cache is not set (not implemented yet).\n" \
"\n" \
"Returns a tab entry or NULL."
static PyObject *Table_find_tag(TableObject *self, PyObject *args, PyObject *kwds)
{
char *kwlist[] = {"tag", "val", "direction", NULL};
char *tag;
char *val;
int direction = MNT_ITER_BACKWARD;
if (!PyArg_ParseTupleAndKeywords(args, kwds, "ss|i", kwlist, &tag, &val, &direction)) {
PyErr_SetString(PyExc_TypeError, ARG_ERR);
return NULL;
}
return PyObjectResultFs(mnt_table_find_tag(self->tab, tag, val, direction));
}
static PyObject *Table_get_nents(TableObject *self)
{
return PyObjectResultInt(mnt_table_get_nents(self->tab));
}
#define Table_is_fs_mounted_HELP "is_fs_mounted(fstab_fs)\n\n" \
"Checks if the fstab_fs entry is already in the tb table. The \"swap\" is\n" \
"ignored. This function explicitly compares source, target and root of the\n" \
"filesystems.\n" \
"\n" \
"Note that source and target are canonicalized only if a cache for tb is\n" \
"defined (not implemented yet). The target canonicalization may\n" \
"trigger automount on autofs mountpoints!\n" \
"\n" \
"Don't use it if you want to know if a device is mounted, just use\n" \
"Tab.find_source() for the device.\n" \
"\n" \
"This function is designed mostly for \"mount -a\".\n" \
"\n" \
"Returns a boolean value."
static PyObject *Table_is_fs_mounted(TableObject *self, PyObject *args, PyObject *kwds)
{
FsObject *fs;
char *kwlist[] = {"fstab_fs", NULL};
if (!PyArg_ParseTupleAndKeywords(args, kwds, "O!", kwlist, &FsType, &fs)) {
PyErr_SetString(PyExc_TypeError, ARG_ERR);
return NULL;
}
return PyBool_FromLong(mnt_table_is_fs_mounted(self->tab, fs->fs));
}
#define Table_parse_file_HELP "parse_file(file)\n\n" \
"Parses whole table (e.g. /etc/mtab) and appends new records to the tab.\n" \
"\n" \
"The libmount parser ignores broken (syntax error) lines, these lines are\n" \
"reported to caller by errcb() function (see Tab.parser_errcb).\n" \
"\n" \
"Returns self or raises an exception in case of an error."
static PyObject *Table_parse_file(TableObject *self, PyObject* args, PyObject *kwds)
{
int rc;
char *file = NULL;
char *kwlist[] = {"file", NULL};
if (!PyArg_ParseTupleAndKeywords(args, kwds, "s", kwlist, &file)) {
PyErr_SetString(PyExc_TypeError, ARG_ERR);
return NULL;
}
rc = mnt_table_parse_file(self->tab, file);
return rc ? UL_RaiseExc(-rc) : UL_IncRef(self);
}
#define Table_parse_fstab_HELP "parse_fstab([fstab])\n\n" \
"This function parses /etc/fstab and appends new lines to the tab. If the\n" \
"filename is a directory then Tab.parse_dir() is called.\n" \
"\n" \
"See also Tab.parser_errcb.\n" \
"\n" \
"Returns self or raises an exception in case of an error."
static PyObject *Table_parse_fstab(TableObject *self, PyObject* args, PyObject *kwds)
{
int rc;
char *fstab = NULL;
char *kwlist[] = {"fstab", NULL};
if (!PyArg_ParseTupleAndKeywords(args, kwds, "|s", kwlist, &fstab)) {
PyErr_SetString(PyExc_TypeError, ARG_ERR);
return NULL;
}
rc = mnt_table_parse_fstab(self->tab, fstab);
return rc ? UL_RaiseExc(-rc) : UL_IncRef(self);
}
#define Table_parse_mtab_HELP "parse_mtab([mtab])\n\n" \
"This function parses /etc/mtab or /proc/self/mountinfo\n" \
"/run/mount/utabs or /proc/mounts.\n" \
"\n" \
"See also Tab.parser_errcb().\n" \
"\n" \
"Returns self or raises an exception in case of an error."
static PyObject *Table_parse_mtab(TableObject *self, PyObject* args, PyObject *kwds)
{
int rc;
char *mtab = NULL;
char *kwlist[] = {"mtab", NULL};
if (!PyArg_ParseTupleAndKeywords(args, kwds, "|s", kwlist, &mtab)) {
PyErr_SetString(PyExc_TypeError, ARG_ERR);
return NULL;
}
rc = mnt_table_parse_mtab(self->tab, mtab);
return rc ? UL_RaiseExc(-rc) : UL_IncRef(self);
}
#define Table_parse_dir_HELP "parse_dir(dir)\n\n" \
"The directory:\n" \
"- files are sorted by strverscmp(3)\n" \
"- files that start with \".\" are ignored (e.g. \".10foo.fstab\")\n" \
"- files without the \".fstab\" extension are ignored\n" \
"\n" \
"Returns self or raises an exception in case of an error."
static PyObject *Table_parse_dir(TableObject *self, PyObject* args, PyObject *kwds)
{
int rc;
char *dir = NULL;
char *kwlist[] = {"dir", NULL};
if (!PyArg_ParseTupleAndKeywords(args, kwds, "s", kwlist, &dir)) {
PyErr_SetString(PyExc_TypeError, ARG_ERR);
return NULL;
}
rc = mnt_table_parse_dir(self->tab, dir);
return rc ? UL_RaiseExc(-rc) : UL_IncRef(self);
}
#define Table_parse_swaps_HELP "parse_swaps(swaps)\n\n" \
"This function parses /proc/swaps and appends new lines to the tab"
static PyObject *Table_parse_swaps(TableObject *self, PyObject* args, PyObject *kwds)
{
int rc;
char *swaps = NULL;
char *kwlist[] = {"swaps", NULL};
if (!PyArg_ParseTupleAndKeywords(args, kwds, "s", kwlist, &swaps)) {
PyErr_SetString(PyExc_TypeError, ARG_ERR);
return NULL;
}
rc = mnt_table_parse_swaps(self->tab, swaps);
return rc ? UL_RaiseExc(-rc) : UL_IncRef(self);
}
#define Table_add_fs_HELP "add_fs(fs)\n\nAdds a new entry to tab.\n" \
"Returns self or raises an exception in case of an error."
static PyObject *Table_add_fs(TableObject *self, PyObject* args, PyObject *kwds)
{
int rc;
FsObject *fs = NULL;
char *kwlist[] = {"fs", NULL};
if (!PyArg_ParseTupleAndKeywords(args, kwds, "O!", kwlist, &FsType, &fs)) {
PyErr_SetString(PyExc_TypeError, ARG_ERR);
return NULL;
}
Py_INCREF(fs);
rc = mnt_table_add_fs(self->tab, fs->fs);
return rc ? UL_RaiseExc(-rc) : UL_IncRef(self);
}
#define Table_remove_fs_HELP "remove_fs(fs)\n\n" \
"Returns self or raises an exception in case of an error."
static PyObject *Table_remove_fs(TableObject *self, PyObject* args, PyObject *kwds)
{
int rc;
FsObject *fs = NULL;
char *kwlist[] = {"fs", NULL};
if (!PyArg_ParseTupleAndKeywords(args, kwds, "O!", kwlist, &FsType, &fs)) {
PyErr_SetString(PyExc_TypeError, ARG_ERR);
return NULL;
}
rc = mnt_table_remove_fs(self->tab, fs->fs);
Py_DECREF(fs);
return rc ? UL_RaiseExc(-rc) : UL_IncRef(self);
}
#define Table_next_fs_HELP "next_fs()\n\n" \
"Returns the next Fs on success, raises an exception in case " \
"of an error and None at end of list.\n" \
"\n" \
"Example:\n" \
"<informalexample>\n" \
"<programlisting>\n" \
"import libmount\n" \
"import functools\n" \
"for fs in iter(functools.partial(tb.next_fs), None):\n" \
" dir = Fs.target\n" \
" print \"mount point: {:s}\".format(dir)\n" \
"\n" \
"</programlisting>\n" \
"</informalexample>\n" \
"\n" \
"lists all mountpoints from fstab in backward order."
static PyObject *Table_next_fs(TableObject *self)
{
struct libmnt_fs *fs;
int rc;
/* Reset the builtin iterator after reaching the end of the list */
rc = mnt_table_next_fs(self->tab, self->iter, &fs);
if (rc == 1) {
mnt_reset_iter(self->iter, MNT_ITER_FORWARD);
Py_RETURN_NONE;
}
if (rc)
return UL_RaiseExc(-rc);
return PyObjectResultFs(fs);
}
static PyMethodDef Table_methods[] = {
{"enable_comments", (PyCFunction)Table_enable_comments, METH_VARARGS|METH_KEYWORDS, Table_enable_comments_HELP},
{"find_pair", (PyCFunction)Table_find_pair, METH_VARARGS|METH_KEYWORDS, Table_find_pair_HELP},
{"find_source", (PyCFunction)Table_find_source, METH_VARARGS|METH_KEYWORDS, Table_find_source_HELP},
{"find_srcpath", (PyCFunction)Table_find_srcpath, METH_VARARGS|METH_KEYWORDS, Table_find_srcpath_HELP},
{"find_tag", (PyCFunction)Table_find_tag, METH_VARARGS|METH_KEYWORDS, Table_find_tag_HELP},
{"find_target", (PyCFunction)Table_find_target, METH_VARARGS|METH_KEYWORDS, Table_find_target_HELP},
{"find_devno", (PyCFunction)Table_find_devno, METH_VARARGS|METH_KEYWORDS, Table_find_devno_HELP},
{"find_mountpoint", (PyCFunction)Table_find_mountpoint, METH_VARARGS|METH_KEYWORDS, Table_find_mountpoint_HELP},
{"parse_file", (PyCFunction)Table_parse_file, METH_VARARGS|METH_KEYWORDS, Table_parse_file_HELP},
{"parse_fstab", (PyCFunction)Table_parse_fstab, METH_VARARGS|METH_KEYWORDS, Table_parse_fstab_HELP},
{"parse_mtab", (PyCFunction)Table_parse_mtab, METH_VARARGS|METH_KEYWORDS, Table_parse_mtab_HELP},
{"parse_dir", (PyCFunction)Table_parse_dir, METH_VARARGS|METH_KEYWORDS, Table_parse_dir_HELP},
{"parse_swaps", (PyCFunction)Table_parse_swaps, METH_VARARGS|METH_KEYWORDS, Table_parse_swaps_HELP},
{"is_fs_mounted", (PyCFunction)Table_is_fs_mounted, METH_VARARGS|METH_KEYWORDS, Table_is_fs_mounted_HELP},
{"add_fs", (PyCFunction)Table_add_fs, METH_VARARGS|METH_KEYWORDS, Table_add_fs_HELP},
{"remove_fs", (PyCFunction)Table_remove_fs, METH_VARARGS|METH_KEYWORDS, Table_remove_fs_HELP},
{"next_fs", (PyCFunction)Table_next_fs, METH_NOARGS, Table_next_fs_HELP},
{"write_file", (PyCFunction)Table_write_file, METH_VARARGS|METH_KEYWORDS, Table_write_file_HELP},
{"replace_file", (PyCFunction)Table_replace_file, METH_VARARGS|METH_KEYWORDS, Table_replace_file_HELP},
{NULL}
};
/* mnt_free_tab() with a few necessary additions */
void Table_unref(struct libmnt_table *tab)
{
struct libmnt_fs *fs;
struct libmnt_iter *iter;
if (!tab)
return;
DBG(TAB, pymnt_debug_h(tab, "un-referencing filesystems"));
iter = mnt_new_iter(MNT_ITER_BACKWARD);
/* remove pylibmount specific references to the entries */
while (mnt_table_next_fs(tab, iter, &fs) == 0)
Py_XDECREF(mnt_fs_get_userdata(fs));
DBG(TAB, pymnt_debug_h(tab, "un-referencing table"));
mnt_unref_table(tab);
mnt_free_iter(iter);
}
static void Table_destructor(TableObject *self)
{
DBG(TAB, pymnt_debug_h(self->tab, "destructor py-obj: %p, py-refcnt=%d",
self, (int) ((PyObject *) self)->ob_refcnt));
Table_unref(self->tab);
self->tab = NULL;
mnt_free_iter(self->iter);
Py_XDECREF(self->errcb);
PyFree(self);
}
static PyObject *Table_new(PyTypeObject *type,
PyObject *args __attribute__((unused)),
PyObject *kwds __attribute__((unused)))
{
TableObject *self = (TableObject*)type->tp_alloc(type, 0);
if (self) {
DBG(TAB, pymnt_debug_h(self, "new"));
self->tab = NULL;
self->iter = NULL;
self->errcb = NULL;
}
return (PyObject *)self;
}
/* explicit tab.__init__() serves as mnt_reset_table(tab) would in C
* and as mnt_new_table{,_from_dir,_from_file}() with proper arguments */
#define Table_HELP "Table(path=None, errcb=None)"
static int Table_init(TableObject *self, PyObject *args, PyObject *kwds)
{
struct libmnt_cache *cache;
char *path = NULL;
char *kwlist[] = {"path", "errcb", NULL};
PyObject *errcb = NULL;
struct stat buf;
memset (&buf, 0, sizeof(struct stat));
if (!PyArg_ParseTupleAndKeywords(args, kwds, "|sO",
kwlist, &path, &errcb))
return -1;
DBG(TAB, pymnt_debug_h(self, "init"));
Table_unref(self->tab);
self->tab = NULL;
if (self->iter)
mnt_reset_iter(self->iter, MNT_ITER_FORWARD);
else
self->iter = mnt_new_iter(MNT_ITER_FORWARD);
if (errcb) {
PyObject *tmp;
if (!PyCallable_Check(errcb))
return -1;
tmp = self->errcb;
Py_INCREF(errcb);
self->errcb = errcb;
Py_XDECREF(tmp);
} else {
Py_XDECREF(self->errcb);
self->errcb = NULL;
}
if (path) {
DBG(TAB, pymnt_debug_h(self, "init: path defined (%s)", path));
if (stat(path, &buf)) {
/* TODO: weird */
PyErr_SetFromErrno(PyExc_RuntimeError);
return -1;
}
if (S_ISREG(buf.st_mode))
self->tab = mnt_new_table_from_file(path);
else if (S_ISDIR(buf.st_mode))
self->tab = mnt_new_table_from_dir(path);
} else {
DBG(TAB, pymnt_debug_h(self, "init: allocate empty table"));
self->tab = mnt_new_table();
}
/* Always set custom handler when using libmount from python */
mnt_table_set_parser_errcb(self->tab, pymnt_table_parser_errcb);
mnt_table_set_userdata(self->tab, self);
cache = mnt_new_cache(); /* TODO: make it optional? */
if (!cache)
return -1;
mnt_table_set_cache(self->tab, cache);
mnt_unref_cache(cache);
return 0;
}
/* Handler for the tab->errcb callback */
int pymnt_table_parser_errcb(struct libmnt_table *tb, const char *filename, int line)
{
int rc = 0;
PyObject *obj;
obj = mnt_table_get_userdata(tb);
if (obj && ((TableObject*) obj)->errcb) {
PyObject *arglist, *result;
arglist = Py_BuildValue("(Osi)", obj, filename, line);
if (!arglist)
return -ENOMEM;
/* A python callback was set, so tb is definitely encapsulated in an object */
result = PyObject_Call(((TableObject *)obj)->errcb, arglist, NULL);
Py_DECREF(arglist);
if (!result)
return -EINVAL;
if (!PyArg_Parse(result, "i", &rc))
rc = -EINVAL;
Py_DECREF(result);
}
return rc;
}
PyObject *PyObjectResultTab(struct libmnt_table *tab)
{
TableObject *result;
if (!tab) {
PyErr_SetString(LibmountError, "internal exception");
return NULL;
}
result = mnt_table_get_userdata(tab);
if (result) {
Py_INCREF(result);
DBG(TAB, pymnt_debug_h(tab, "result py-obj %p: already exists, py-refcnt=%d",
result, (int) ((PyObject *) result)->ob_refcnt));
return (PyObject *) result;
}
/* Creating an encapsulating object: increment the refcount, so that
* code such as: cxt.get_fstab() doesn't call the destructor, which
* would free our tab struct as well
*/
result = PyObject_New(TableObject, &TableType);
if (!result) {
UL_RaiseExc(ENOMEM);
return NULL;
}
Py_INCREF(result);
mnt_ref_table(tab);
DBG(TAB, pymnt_debug_h(tab, "result py-obj %p new, py-refcnt=%d",
result, (int) ((PyObject *) result)->ob_refcnt));
result->tab = tab;
result->iter = mnt_new_iter(MNT_ITER_FORWARD);
mnt_table_set_userdata(result->tab, result);
result->errcb = NULL;
return (PyObject *) result;
}
static PyGetSetDef Table_getseters[] = {
{"nents", (getter)Table_get_nents, NULL, "number of valid entries in tab", NULL},
{"intro_comment", (getter)Table_get_intro_comment, (setter)Table_set_intro_comment, "fstab intro comment", NULL},
{"trailing_comment", (getter)Table_get_trailing_comment, (setter)Table_set_trailing_comment, "fstab trailing comment", NULL},
{"errcb", NULL, (setter)Table_set_parser_errcb, "parser error callback", NULL},
{NULL}
};
static PyObject *Table_repr(TableObject *self)
{
return PyUnicode_FromFormat(
"<libmount.Table object at %p, entries=%d, comments_enabled=%s, errcb=%s>",
self,
mnt_table_get_nents(self->tab),
mnt_table_with_comments(self->tab) ? "True" : "False",
self->errcb ? pystos(PyObject_Repr(self->errcb)) : "None");
}
PyTypeObject TableType = {
PyVarObject_HEAD_INIT(NULL, 0)
.tp_name = "libmount.Table",
.tp_basicsize = sizeof(TableObject),
.tp_dealloc = (destructor)Table_destructor,
.tp_repr = (reprfunc) Table_repr,
.tp_flags = Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE,
.tp_doc = Table_HELP,
.tp_methods = Table_methods,
.tp_members = Table_members,
.tp_getset = Table_getseters,
.tp_init = (initproc)Table_init,
.tp_new = Table_new,
};
void Table_AddModuleObject(PyObject *mod)
{
if (PyType_Ready(&TableType) < 0)
return;
DBG(TAB, pymnt_debug("add to module"));
Py_INCREF(&TableType);
PyModule_AddObject(mod, "Table", (PyObject *)&TableType);
}

View file

@ -0,0 +1,173 @@
import os
import sys
import stat
import errno
# use "import libmount" for in a standard way installed python binding
import pylibmount as mnt
def usage(tss):
print("\nUsage:\n\t{:s} <test> [testoptions]\nTests:\n".format(sys.argv[0]))
for i in tss:
print("\t{15:-s}".format(i[0]))
if i[2] != "":
print(" {:s}\n".format(i[2]))
print("\n")
return 1
def mnt_run_test(tss, argv):
rc = -1
if ((len(argv) < 2) or (argv[1] == "--help") or (argv[1] == "-h")):
return usage(tss)
#mnt_init_debug(0)
i=()
for i in tss:
if i[0] == argv[1]:
rc = i[1](i, argv[1:])
if rc:
print("FAILED [rc={:d}]".format(rc))
break
if ((rc < 0) and (i == ())):
return usage(tss)
return not not rc #because !!rc is too mainstream for python
def test_mount(ts, argv):
idx = 1
rc = 0
if len(argv) < 2:
return -errno.EINVAL
cxt = mnt.Context()
if argv[idx] == "-o":
cxt.options = argv[idx+1]
idx += 2
if argv[idx] == "-t":
cxt.fstype = argv[idx+1]
idx += 2
if len(argv) == idx + 1:
cxt.target = argv[idx]
idx+=1
elif (len(argv) == idx + 2):
cxt.source = argv[idx]
idx += 1
cxt.target = argv[idx]
idx += 1
try:
cxt.mount()
except Exception:
print("failed to mount")
return -1
print("successfully mounted")
return rc
def test_umount(ts, argv):
idx = 1
rc = 0
if len(argv) < 2:
return -errno.EINVAL
cxt = mnt.Context()
if argv[idx] == "-t":
cxt.options = argv[idx+1]
idx += 2
if argv[idx] == "-f":
cxt.enable_force(True)
if argv[idx] == "-l":
cxt.enable_lazy(True)
idx += 1
elif argv[idx] == "-r":
cxt.enable_rdonly_umount(True)
idx += 1
if len(argv) == idx + 1:
cxt.target = argv[idx]
idx += 1
else:
return -errno.EINVAL
try:
cxt.umount()
except Exception:
print("failed to umount")
return 1
print("successfully umounted")
return rc
def test_flags(ts, argv):
idx = 1
rc = 0
opt = ""
flags = 0
cxt = mnt.Context()
if argv[idx] == "-o":
cxt.options = argv[idx + 1]
idx += 2
if len(argv) == idx + 1:
cxt.target = argv[idx]
idx += 1
try:
cxt.prepare_mount()
# catch ioerror here
except IOError as e:
print("failed to prepare mount {:s}".format(e.strerror))
opt = cxt.fs.options
if (opt):
print("options: {:s}", opt)
print("flags: {08:lx}".format(cxt.mflags()))
return rc
def test_mountall(ts, argv):
mntrc = 1
ignored = 1
idx = 1
cxt = mnt.Context()
if len(argv) > 2:
if argv[idx] == "-O":
cxt.options_pattern = argv[idx+1]
idx += 2
if argv[idx] == "-t":
cxt.fstype_pattern = argv[idx+1]
idx += 2
i = ()
while (cxt.next_mount()):
tgt = i.target
if (ignored == 1):
print("{:s}: ignored: not match".format(tgt))
elif (ignored == 2):
print("{:s}: ignored: already mounted".format(tgt))
elif (not cxt.status):
if (mntrc > 0):
# ?? errno = mntrc
print("{:s}: mount failed".format(tgt))
else:
print("{:s}: mount failed".format(tgt))
else:
print("{:s}: successfully mounted".format(tgt))
return 0
tss = (
( "--mount", test_mount, "[-o <opts>] [-t <type>] <spec>|<src> <target>" ),
( "--umount", test_umount, "[-t <type>] [-f][-l][-r] <src>|<target>" ),
( "--mount-all", test_mountall, "[-O <pattern>] [-t <pattern] mount all filesystems from fstab" ),
( "--flags", test_flags, "[-o <opts>] <spec>" )
)
os.umask(stat.S_IWGRP | stat.S_IWOTH) #to be compatible with mount(8)
sys.exit(mnt_run_test(tss, sys.argv))

164
libmount/python/test_mount_tab.py Executable file
View file

@ -0,0 +1,164 @@
import os
import sys
import stat
import errno
import functools as ft
# use "import libmount" for in a standard way installed python binding
import pylibmount as mnt
def usage(tss):
print("\nUsage:\n\t{:s} <test> [testoptions]\nTests:\n".format(sys.argv[0]))
for i in tss:
print("\t{15:-s}".format(i[0]))
if i[2] != "":
print(" {:s}\n".format(i[2]))
print("\n")
return 1
def mnt_run_test(tss, argv):
rc = -1
if ((len(argv) < 2) or (argv[1] == "--help") or (argv[1] == "-h")):
return usage(tss)
#mnt_init_debug(0)
i=()
for i in tss:
if i[0] == argv[1]:
rc = i[1](i, argv[1:])
if rc:
print("FAILED [rc={:d}]".format(rc))
break
if ((rc < 0) and (i == ())):
return usage(tss)
return not not rc #because !!rc is too mainstream for python
def parser_errcb(tb, fname, line):
print("{:s}:{:d}: parse error".format(fname, line))
return 1
def create_table(f, comments):
if not f:
return None
tb = mnt.Table()
tb.enable_comments(comments)
tb.errcb = parser_errcb
try:
tb.parse_file(f)
except Exception:
print("{:s}: parsing failed".format(f))
return None
return tb
def test_copy_fs(ts, argv):
rc = -1
tb = create_table(argv[1], False)
fs = tb.find_target("/", mnt.MNT_ITER_FORWARD)
if not fs:
return rc
print("ORIGINAL:")
fs.print_debug()
fs = fs.copy_fs(None)
if not fs:
return rc
print("COPY:")
fs.print_debug()
return 0
def test_parse(ts, argv):
parse_comments = False
if len(argv) == 3 and argv[2] == "--comments":
parse_comments = True
tb = create_table(argv[1], parse_comments)
if tb.intro_comment:
print("Initial comment:\n\"{:s}\"".format(tb.intro_comment))
#while ((fs = tb.next_fs()) != None):
for fs in iter(ft.partial(tb.next_fs), None):
fs.print_debug()
if tb.trailing_comment:
print("Trailing comment:\n\"{:s}\"".format(tb.trailing_comment))
return 0
def test_find(ts, argv, dr):
if len(argv) != 4:
print("try --help")
return -errno.EINVAL
f, find, what = argv[1:]
tb = create_table(f, False)
if find.lower() == "source":
fs = tb.find_source(what, dr)
elif find.lower() == "target":
fs = tb.find_target(what, dr)
if not fs:
print("{:s}: not found {:s} '{:s}'".format(f, find, what))
else:
fs.print_debug()
return 0
def test_find_fw(ts, argv):
return test_find(ts, argv, mnt.MNT_ITER_FORWARD)
def test_find_bw(ts, argv):
return test_find(ts, argv, mnt.MNT_ITER_BACKWARD)
def test_find_pair(ts, argv):
rc = -1
tb = create_table(argv[1], False)
fs = tb.find_pair(argv[2], argv[3], mnt.MNT_ITER_FORWARD)
if not fs:
return rc
fs.print_debug()
return 0
def test_is_mounted(ts, argv):
rc = -1
tb = mnt.Tab(path="/proc/self/mountinfo")
if not tb:
print("failed to parse mountinto")
return rc
fstab = create_table(argv[1], False)
if not fstab:
return rc
fs = ()
for fs in ft.iter(tb.next_fs(), -1):
if tb.is_fs_mounted(fs):
print("{:s} already mounted on {:s}".format(fs.source, fs.target))
else:
print("{:s} not mounted on {:s}".format(fs.source, fs.target))
return 0
def test_find_mountpoint(ts, argv):
rc = -1
tb = mnt.Table("/proc/self/mountinfo")
if not tb:
return rc
fs = tb.find_mountpoint(argv[1], mnt.MNT_ITER_BACKWARD)
if not fs:
return rc
fs.print_debug()
return 0
tss = (
( "--parse", test_parse, "<file> [--comments] parse and print(tab" ),
( "--find-forward", test_find_fw, "<file> <source|target> <string>" ),
( "--find-backward", test_find_bw, "<file> <source|target> <string>" ),
( "--find-pair", test_find_pair, "<file> <source> <target>" ),
( "--find-mountpoint", test_find_mountpoint, "<path>" ),
( "--copy-fs", test_copy_fs, "<file> copy root FS from the file" ),
( "--is-mounted", test_is_mounted, "<fstab> check what from <file> are already mounted" ),
)
sys.exit(mnt_run_test(tss, sys.argv))

View file

@ -0,0 +1,59 @@
import os
import sys
import stat
import errno
# use "import libmount" for in a standard way installed python binding
import pylibmount as mnt
def usage(tss):
print("\nUsage:\n\t{:s} <test> [testoptions]\nTests:\n".format(sys.argv[0]))
for i in tss:
print("\t{15:-s}".format(i[0]))
if i[2] != "":
print(" {:s}\n".format(i[2]))
print("\n")
return 1
def mnt_run_test(tss, argv):
rc = -1
if ((len(argv) < 2) or (argv[1] == "--help") or (argv[1] == "-h")):
return usage(tss)
#mnt_init_debug(0)
i=()
for i in tss:
if i[0] == argv[1]:
rc = i[1](i, argv[1:])
if rc:
print("FAILED [rc={:d}]".format(rc))
break
if ((rc < 0) and (i == ())):
return usage(tss)
return not not rc #because !!rc is too mainstream for python
def test_replace(ts, argv):
fs = mnt.Fs()
tb = mnt.Table()
if (len(argv) < 3):
return -1
tb.enable_comments(True)
tb.parse_fstab()
fs.source = argv[1]
fs.target = argv[2]
#TODO: resolve None + string
fs.comment = "# this is new filesystem\n"
tb.add_fs(fs)
tb.replace_file(os.environ["LIBMOUNT_FSTAB"])
return 0
tss = (
( "--replace",test_replace, "<src> <target> Add a line to LIBMOUNT_FSTAB and replace the original file" ),
)
sys.exit(mnt_run_test(tss, sys.argv))