Do you know tee program? Its man page reads:

tee - read from standard input and write to standard output and files

It makes it easy to split output of one program into both stdout and files. It’s a nice UNIX tool. Recently I was doing code review and it turned out that equivalent of such thing may be pretty useful in Python programs too:

with open("file1.txt") as f1, tee(open("file2.txt")) as f2:
  shutil.copyfileobj(f1, f2)
  if f2.tail not in ('\r', '\n'):
    f2.fileobj.write('\n')

It allows to do extra work, so we can employ it to e.g. simultaneous hash calculation or other job.


I came up with this idea whilst reviewing some code. I saw following function (anonymized).

    def _some_private_method(cls, paths: Iterable[str]):
        special_paths = filter(is_special_path, paths)

        with open(FILEPATH, "wb") as out_file:
            for path in special_paths:
                LOGGER.info(f"Adding {path}")
                with open(path, "rb") as additional_file:
                    shutil.copyfileobj(additional_file, out_file)

                    additional_file.seek(-1, 2)
                    last_byte = additional_file.read()
                    if last_byte != b"\n" and last_byte != b"\r":
                        out_file.write(b"\n")

I don’t like such code. The seek hack is obscure. What can be done to make it better? What if we simply remembered what was the last byte copied by shutil.copyfileobj?

Unfortunately, copyfileobj accepts only two fileobjs and buffer size. Recently I was experimenting with indexed_gzip and I had to roll out my own copy of copyfileobj that apart from copying the data was also calculating md5 hash and number of bytes copied.

An alternative is to wrap one of the arguments with something that will do whatever we want. Let’s focus on the problem at hand: adding newline if necessary.

@dataclass
class MyTee:
    fileobj: io.BufferedReader
    tail: bytes = field(init=False)

    def write(self, data):
        self.tail = data[-1:]  # without a colon it would not become bytes()
        self.fileobj.write(data)

If we need to remember k last characters, we can simply use collections.deque as tail and it will work as a circular buffer.

In order to make it look like in the first listing we need to add trivial context manager:

@contextlib.contextmanager
def tee(fileobj):
    yield MyTee(fileobj)

And voille-a!

This mechanism can be further improved to be more flexible etc.