# Copyright 2019, 2020 Jonas Eriksson
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Tool helpers; used to run commands for the simple disk image package
"""
import os
import re
import subprocess
import tempfile
import logging
import distutils.spawn
from .common import logger, DiskImageException
[docs]class DoubleQuoteInExtFile(DiskImageException):
"""
debugfs does not cope well with double quotes in file names, which was
detected.
"""
[docs]class MkfsExt(Tool):
"""
Tool wrapper for mkfs.ext*
"""
[docs] def mkfs(self, device, label=None, initial_data_root=None):
"""
Create filesystem
:param device: Device, typically a file in our use case
:param label: Filesystem label
"""
args = []
if label is not None:
args.append('-L')
args.append(label)
if initial_data_root is not None:
args.append('-d')
args.append(initial_data_root)
args.append(device)
self.call(*args)
[docs]class MkfsExt2(MkfsExt):
"""
Tool wrapper for mkfs.ext2
"""
def __init__(self):
super().__init__('mkfs.ext2')
[docs]class MkfsExt3(MkfsExt):
"""
Tool wrapper for mkfs.ext3
"""
def __init__(self):
super().__init__('mkfs.ext3')
[docs]class MkfsExt4(MkfsExt):
"""
Tool wrapper for mkfs.ext4
"""
def __init__(self):
super().__init__('mkfs.ext4')
[docs]class MkfsFAT(Tool):
"""
Tool wrapper for mkfs.fat
"""
def __init__(self, fat_size):
self._fat_size = str(fat_size)
super().__init__('mkfs.fat')
[docs] def mkfs(self, device, label=None, initial_data_root=None):
"""
Create filesystem
:param device: Device, typically a file in our use case
:param label: Filesystem label
"""
args = ['-F', self._fat_size]
if label is not None:
args.append('-n')
args.append(label)
if initial_data_root is not None:
raise DiskImageException("Got initial_data_root into a non-ext "
"mkfs, this should not happen")
args.append(device)
self.call(*args)
[docs]class MkfsFAT12(MkfsFAT):
"""
Tool wrapper for mkfs.fat -F 12
"""
def __init__(self):
super().__init__(12)
[docs]class MkfsFAT16(MkfsFAT):
"""
Tool wrapper for mkfs.fat -F 16
"""
def __init__(self):
super().__init__(16)
[docs]class MkfsFAT32(MkfsFAT):
"""
Tool wrapper for mkfs.fat -F 32
"""
def __init__(self):
super().__init__(32)
def _dq_check(filename):
if '"' in filename:
raise DoubleQuoteInExtFile("The filename {} contains double quotes "
"which are not supported by the Ext file "
"system tools".format(filename))
def _ext_cmds_mkdir(directories):
cmds = []
for directory in directories:
# Make directory path absolute and remove multiple slashes
directory = re.sub(r'/+', '/', '/' + directory)
_dq_check(directory)
cmds.append('mkdir "{}"'.format(directory))
return cmds
def _ext_cmds_write(sources, destination):
# Quick exit if sources are empty
if not sources:
return []
cmds = []
# Make all cd:s absolute, and clean out multiple slashes as those will
# break debugfs
destination = re.sub(r'/+', '/', '/' + destination)
_dq_check(destination)
cmds.append('cd "{}"'.format(destination))
for source in sources:
file_name = os.path.basename(source)
_dq_check(file_name)
cmds.append('write "{}" "{}"'.format(source, file_name))
return cmds
[docs]class PopulateExt(Tool):
"""
Tool wrapper for debugfs
"""
def __init__(self):
super().__init__('debugfs')
[docs] def run(self, device, actions):
"""
Perform the requested actions using the tool.
:param device: Device to perfom the actions on
:param actions: Actions to perform
"""
commands = []
for action_tuple in actions:
action = list(action_tuple)
verb = action.pop(0)
if verb == 'mkdir':
dirs = action[0]
commands.extend(_ext_cmds_mkdir(dirs))
elif verb == 'copy':
sources = action[0]
destination = action[1]
commands.extend(_ext_cmds_write(sources, destination))
elif verb == 'copy recursive':
sources = action[0]
destination = action[1]
for src in sources:
base_path = os.path.join(destination,
os.path.basename(src))
if os.path.isdir(src):
commands.extend(_ext_cmds_mkdir([base_path]))
for parent, _dirs, _files in os.walk(src):
current_dest = "{}/{}".format(base_path,
parent[len(src):])
dirs = [os.path.join(current_dest, _dir) for _dir in _dirs]
commands.extend(_ext_cmds_mkdir(dirs))
files = [os.path.join(parent, _files)
for _files in _files]
commands.extend(_ext_cmds_write(files, current_dest))
if logger.isEnabledFor(logging.DEBUG):
logger.debug("Compiled a debugfs script:")
for command in commands:
logger.debug(" %s", command)
_fd, commands_file = tempfile.mkstemp()
with open(commands_file, 'w') as handle:
handle.write("\n".join(commands))
self.call('-w', '-f', commands_file, device)
os.unlink(commands_file)
def _fat_format_dest(path):
if path[0] == '/':
path = path[1:]
return '::' + path
[docs]class Sfdisk(Tool):
"""
Tool wrapper for sfdisk
"""
def __init__(self):
super().__init__('sfdisk')
[docs]class PopulateFAT():
"""
tool wrapper for mtools
"""
def __init__(self):
self._mcopy = Tool('mcopy')
self._mmd = Tool('mmd')
[docs] def check(self):
"""
Check that both wrapped tools are available
"""
return self._mcopy.check() and self._mmd.check()
[docs] def run(self, device, actions):
"""
Perform the requested actions using the tool.
:param device: Device to perfom the actions on
:param actions: Actions to perform
"""
for action_tuple in actions:
action = list(action_tuple)
verb = action.pop(0)
if verb == 'mkdir':
dirs = [_fat_format_dest(p) for p in action[0]]
self._mmd.call('-i', device, *dirs)
elif verb == 'copy':
sources = action[0]
destination = _fat_format_dest(action[1])
self._mcopy.call('-i', device, '-bQ', *sources, destination)
elif verb == 'copy recursive':
sources = action[0]
destination = _fat_format_dest(action[1])
self._mcopy.call('-i', device, '-bsQ', *sources, destination)
# pylint: disable=bad-whitespace
_TOOLS = {
('ext2', 'mkfs'): MkfsExt2,
('ext3', 'mkfs'): MkfsExt3,
('ext4', 'mkfs'): MkfsExt4,
('fat12', 'mkfs'): MkfsFAT12,
('fat16', 'mkfs'): MkfsFAT16,
('fat32', 'mkfs'): MkfsFAT32,
('ext2', 'populate'): PopulateExt,
('ext3', 'populate'): PopulateExt,
('ext4', 'populate'): PopulateExt,
('fat12', 'populate'): PopulateFAT,
('fat16', 'populate'): PopulateFAT,
('fat32', 'populate'): PopulateFAT,
('none', 'sfdisk'): Sfdisk,
}
_TOOLS_CACHE = {}