Files

356 lines
11 KiB
Python

# -*- coding: utf-8 -*-
from __future__ import absolute_import, print_function, unicode_literals
import io
import logging
import os
import shutil
import sys
import tempfile
from collections import OrderedDict
from contextlib import contextmanager
from .compat import IS_TYPE_CHECKING, PY2, StringIO, to_env
from .parser import Binding, parse_stream
from .variables import parse_variables
logger = logging.getLogger(__name__)
if IS_TYPE_CHECKING:
from typing import (IO, Dict, Iterable, Iterator, Mapping, Optional, Text,
Tuple, Union)
if sys.version_info >= (3, 6):
_PathLike = os.PathLike
else:
_PathLike = Text
if sys.version_info >= (3, 0):
_StringIO = StringIO
else:
_StringIO = StringIO[Text]
def with_warn_for_invalid_lines(mappings):
# type: (Iterator[Binding]) -> Iterator[Binding]
for mapping in mappings:
if mapping.error:
logger.warning(
"Python-dotenv could not parse statement starting at line %s",
mapping.original.line,
)
yield mapping
class DotEnv():
def __init__(self, dotenv_path, verbose=False, encoding=None, interpolate=True, override=True):
# type: (Union[Text, _PathLike, _StringIO], bool, Union[None, Text], bool, bool) -> None
self.dotenv_path = dotenv_path # type: Union[Text,_PathLike, _StringIO]
self._dict = None # type: Optional[Dict[Text, Optional[Text]]]
self.verbose = verbose # type: bool
self.encoding = encoding # type: Union[None, Text]
self.interpolate = interpolate # type: bool
self.override = override # type: bool
@contextmanager
def _get_stream(self):
# type: () -> Iterator[IO[Text]]
if isinstance(self.dotenv_path, StringIO):
yield self.dotenv_path
elif os.path.isfile(self.dotenv_path):
with io.open(self.dotenv_path, encoding=self.encoding) as stream:
yield stream
else:
if self.verbose:
logger.info("Python-dotenv could not find configuration file %s.", self.dotenv_path or '.env')
yield StringIO('')
def dict(self):
# type: () -> Dict[Text, Optional[Text]]
"""Return dotenv as dict"""
if self._dict:
return self._dict
raw_values = self.parse()
if self.interpolate:
self._dict = OrderedDict(resolve_variables(raw_values, override=self.override))
else:
self._dict = OrderedDict(raw_values)
return self._dict
def parse(self):
# type: () -> Iterator[Tuple[Text, Optional[Text]]]
with self._get_stream() as stream:
for mapping in with_warn_for_invalid_lines(parse_stream(stream)):
if mapping.key is not None:
yield mapping.key, mapping.value
def set_as_environment_variables(self):
# type: () -> bool
"""
Load the current dotenv as system environment variable.
"""
for k, v in self.dict().items():
if k in os.environ and not self.override:
continue
if v is not None:
os.environ[to_env(k)] = to_env(v)
return True
def get(self, key):
# type: (Text) -> Optional[Text]
"""
"""
data = self.dict()
if key in data:
return data[key]
if self.verbose:
logger.warning("Key %s not found in %s.", key, self.dotenv_path)
return None
def get_key(dotenv_path, key_to_get):
# type: (Union[Text, _PathLike], Text) -> Optional[Text]
"""
Gets the value of a given key from the given .env
If the .env path given doesn't exist, fails
"""
return DotEnv(dotenv_path, verbose=True).get(key_to_get)
@contextmanager
def rewrite(path):
# type: (_PathLike) -> Iterator[Tuple[IO[Text], IO[Text]]]
try:
if not os.path.isfile(path):
with io.open(path, "w+") as source:
source.write("")
with tempfile.NamedTemporaryFile(mode="w+", delete=False) as dest:
with io.open(path) as source:
yield (source, dest) # type: ignore
except BaseException:
if os.path.isfile(dest.name):
os.unlink(dest.name)
raise
else:
shutil.move(dest.name, path)
def set_key(dotenv_path, key_to_set, value_to_set, quote_mode="always", export=False):
# type: (_PathLike, Text, Text, Text, bool) -> Tuple[Optional[bool], Text, Text]
"""
Adds or Updates a key/value to the given .env
If the .env path given doesn't exist, fails instead of risking creating
an orphan .env somewhere in the filesystem
"""
value_to_set = value_to_set.strip("'").strip('"')
if " " in value_to_set:
quote_mode = "always"
if quote_mode == "always":
value_out = '"{}"'.format(value_to_set.replace('"', '\\"'))
else:
value_out = value_to_set
if export:
line_out = 'export {}={}\n'.format(key_to_set, value_out)
else:
line_out = "{}={}\n".format(key_to_set, value_out)
with rewrite(dotenv_path) as (source, dest):
replaced = False
for mapping in with_warn_for_invalid_lines(parse_stream(source)):
if mapping.key == key_to_set:
dest.write(line_out)
replaced = True
else:
dest.write(mapping.original.string)
if not replaced:
dest.write(line_out)
return True, key_to_set, value_to_set
def unset_key(dotenv_path, key_to_unset, quote_mode="always"):
# type: (_PathLike, Text, Text) -> Tuple[Optional[bool], Text]
"""
Removes a given key from the given .env
If the .env path given doesn't exist, fails
If the given key doesn't exist in the .env, fails
"""
if not os.path.exists(dotenv_path):
logger.warning("Can't delete from %s - it doesn't exist.", dotenv_path)
return None, key_to_unset
removed = False
with rewrite(dotenv_path) as (source, dest):
for mapping in with_warn_for_invalid_lines(parse_stream(source)):
if mapping.key == key_to_unset:
removed = True
else:
dest.write(mapping.original.string)
if not removed:
logger.warning("Key %s not removed from %s - key doesn't exist.", key_to_unset, dotenv_path)
return None, key_to_unset
return removed, key_to_unset
def resolve_variables(values, override):
# type: (Iterable[Tuple[Text, Optional[Text]]], bool) -> Mapping[Text, Optional[Text]]
new_values = {} # type: Dict[Text, Optional[Text]]
for (name, value) in values:
if value is None:
result = None
else:
atoms = parse_variables(value)
env = {} # type: Dict[Text, Optional[Text]]
if override:
env.update(os.environ) # type: ignore
env.update(new_values)
else:
env.update(new_values)
env.update(os.environ) # type: ignore
result = "".join(atom.resolve(env) for atom in atoms)
new_values[name] = result
return new_values
def _walk_to_root(path):
# type: (Text) -> Iterator[Text]
"""
Yield directories starting from the given directory up to the root
"""
if not os.path.exists(path):
raise IOError('Starting path not found')
if os.path.isfile(path):
path = os.path.dirname(path)
last_dir = None
current_dir = os.path.abspath(path)
while last_dir != current_dir:
yield current_dir
parent_dir = os.path.abspath(os.path.join(current_dir, os.path.pardir))
last_dir, current_dir = current_dir, parent_dir
def find_dotenv(filename='.env', raise_error_if_not_found=False, usecwd=False):
# type: (Text, bool, bool) -> Text
"""
Search in increasingly higher folders for the given file
Returns path to the file if found, or an empty string otherwise
"""
def _is_interactive():
""" Decide whether this is running in a REPL or IPython notebook """
main = __import__('__main__', None, None, fromlist=['__file__'])
return not hasattr(main, '__file__')
if usecwd or _is_interactive() or getattr(sys, 'frozen', False):
# Should work without __file__, e.g. in REPL or IPython notebook.
path = os.getcwd()
else:
# will work for .py files
frame = sys._getframe()
# find first frame that is outside of this file
if PY2 and not __file__.endswith('.py'):
# in Python2 __file__ extension could be .pyc or .pyo (this doesn't account
# for edge case of Python compiled for non-standard extension)
current_file = __file__.rsplit('.', 1)[0] + '.py'
else:
current_file = __file__
while frame.f_code.co_filename == current_file:
assert frame.f_back is not None
frame = frame.f_back
frame_filename = frame.f_code.co_filename
path = os.path.dirname(os.path.abspath(frame_filename))
for dirname in _walk_to_root(path):
check_path = os.path.join(dirname, filename)
if os.path.isfile(check_path):
return check_path
if raise_error_if_not_found:
raise IOError('File not found')
return ''
def load_dotenv(
dotenv_path=None,
stream=None,
verbose=False,
override=False,
interpolate=True,
encoding="utf-8",
):
# type: (Union[Text, _PathLike, None], Optional[_StringIO], bool, bool, bool, Optional[Text]) -> bool # noqa
"""Parse a .env file and then load all the variables found as environment variables.
- *dotenv_path*: absolute or relative path to .env file.
- *stream*: `StringIO` object with .env content, used if `dotenv_path` is `None`.
- *verbose*: whether to output a warning the .env file is missing. Defaults to
`False`.
- *override*: whether to override the system environment variables with the variables
in `.env` file. Defaults to `False`.
- *encoding*: encoding to be used to read the file.
If both `dotenv_path` and `stream`, `find_dotenv()` is used to find the .env file.
"""
f = dotenv_path or stream or find_dotenv()
dotenv = DotEnv(
f,
verbose=verbose,
interpolate=interpolate,
override=override,
encoding=encoding,
)
return dotenv.set_as_environment_variables()
def dotenv_values(
dotenv_path=None,
stream=None,
verbose=False,
interpolate=True,
encoding="utf-8",
):
# type: (Union[Text, _PathLike, None], Optional[_StringIO], bool, bool, Optional[Text]) -> Dict[Text, Optional[Text]] # noqa: E501
"""
Parse a .env file and return its content as a dict.
- *dotenv_path*: absolute or relative path to .env file.
- *stream*: `StringIO` object with .env content, used if `dotenv_path` is `None`.
- *verbose*: whether to output a warning the .env file is missing. Defaults to
`False`.
in `.env` file. Defaults to `False`.
- *encoding*: encoding to be used to read the file.
If both `dotenv_path` and `stream`, `find_dotenv()` is used to find the .env file.
"""
f = dotenv_path or stream or find_dotenv()
return DotEnv(
f,
verbose=verbose,
interpolate=interpolate,
override=True,
encoding=encoding,
).dict()