feature/ssh-server (#1)
All checks were successful
/ ssh-client (push) Successful in 10s
/ ssh-server (push) Successful in 10s

Co-authored-by: Márcio Fernandes <marcio.fernandes@outlook.pt>
Reviewed-on: #1
This commit was merged in pull request #1.
This commit is contained in:
2025-09-07 13:50:18 +00:00
parent ea24e0e41a
commit ce1d7a749a
23 changed files with 727 additions and 10 deletions

1
docker/ssh-server/app/.gitignore vendored Normal file
View File

@@ -0,0 +1 @@
__pycache__

View File

@@ -0,0 +1,28 @@
import os
import yaml
def is_debugging(): return os.getenv("CONFIGURATION", "").lower() == "debug"
file_path="/etc/app/config/config.yaml"
config=None
def config_exits():
return get_config() is not None
def sshserver_enabled():
return not is_debugging() or os.getenv("SSH_SERVER_ENABLED", "false").lower() == "true"
def get_config():
global config
if config == None:
load_config()
return config
def load_config():
global config
if os.path.exists(file_path):
with open(file_path, 'r') as f:
config = yaml.safe_load(f)
else:
print(f"⚠️ missing " + file_path)

View File

@@ -0,0 +1,41 @@
import os
import subprocess
import globals
import sys
def main():
if globals.is_debugging():
import debugpy
debugpy.listen(("0.0.0.0", 5678))
print("INFO: Waiting for debugger to attach...")
debugpy.wait_for_client()
print("INFO: Debugger attached")
else:
print("👉 starting setup")
setup_container()
if globals.is_debugging():
debug_file = os.getenv("DEBUG_FILE", "")
if debug_file and os.path.isfile(debug_file):
print(f"🔍 Running debug file: {debug_file}")
subprocess.run(["python3", debug_file], check=True, stdout=sys.stdout, stderr=sys.stderr)
return False
elif debug_file:
print(f"⚠️ DEBUG_FILE set but file not found: {debug_file}")
return True
def setup_container():
marker_file = "/var/run/container_initialized"
if not os.path.exists(marker_file):
print("INFO: First-time setup running...")
# 👉 Your one-time setup logic here
# Create the marker file
with open(marker_file, "w") as f:
f.write("initialized\n")
else:
print("TRACE: Already initialized. Skipping setup.")

View File

@@ -0,0 +1,14 @@
import globals
import subprocess
import init
import users
import sshserver
if init.main():
users.load()
sshserver.load()

View File

@@ -0,0 +1 @@
PyYAML>=5.4

View File

@@ -0,0 +1,126 @@
import yaml
import subprocess
import crypt
import os
import globals
import sys
config_file_path='/etc/ssh/sshd_config'
def set_sshd_option(file_path: str, key: str, value: str) -> None:
updated = False
lines = []
with open(file_path, 'r') as f:
for line in f:
if line.strip().startswith(key):
lines.append(f"{key} {value}\n")
updated = True
else:
lines.append(line)
if not updated:
lines.append(f"{key} {value}\n")
with open(file_path, 'w') as f:
f.writelines(lines)
print(f"✅ Updated {key} to '{value}' in {file_path}")
def load():
setup()
#print_server_config()
if globals.sshserver_enabled():
start_server()
def setup_certs():
certs=[
"/etc/ssh/certs/ssh_host_rsa_key",
"/etc/ssh/certs/ssh_host_ecdsa_key",
"/etc/ssh/certs/ssh_host_ed25519_key"
]
if not os.path.exists("/etc/ssh/certs"):
os.makedirs("/etc/ssh/certs")
print(f"📁 Created folder: /etc/ssh/certs")
if not os.listdir("/etc/ssh/certs"):
subprocess.run([
"ssh-keygen", "-t", "rsa", "-f",
"/etc/ssh/certs/ssh_host_rsa_key"
], check=True, stdout=sys.stdout, stderr=sys.stderr)
print(f"✅ RSA key and certificate created:🔑 /etc/ssh/certs/ssh_host_rsa_key")
subprocess.run([
"ssh-keygen", "-t", "ecdsa", "-f",
"/etc/ssh/certs/ssh_host_ecdsa_key"
], check=True, stdout=sys.stdout, stderr=sys.stderr)
print(f"✅ RSA key and certificate created:🔑 /etc/ssh/certs/ssh_host_ecdsa_key")
subprocess.run([
"ssh-keygen", "-t", "ed25519", "-f",
"/etc/ssh/certs/ssh_host_ed25519_key"
], check=True, stdout=sys.stdout, stderr=sys.stderr)
print(f"✅ RSA key and certificate created:🔑 /etc/ssh/certs/ssh_host_ed25519_key")
certLines=[]
for cert in certs:
if os.path.exists(cert):
certLines.append(f"HostKey {cert}\n")
else:
print(f"❌ HostKey path not found {cert}")
if not certLines: RuntimeError("❌ Missing server certificates configuration. Bind Volume to /etc/ssh/certs")
lines = []
with open(config_file_path, 'r') as f:
for line in f:
if line.strip().startswith("HostKey"):
continue # remove existing HostKey lines
lines.append(line)
for key in certLines:
print(f"✅ HostKey path updated to use {key}")
lines.append(key)
with open(config_file_path, 'w') as f:
f.writelines(lines)
def setup():
global config_file_path
serverConfig = globals.get_config().get("server") if globals.config_exits() else None
if not serverConfig:
return
optionsConfig = serverConfig.get("options")
if optionsConfig:
for option in optionsConfig:
set_sshd_option(config_file_path, option, optionsConfig[option])
setup_certs()
def print_server_config():
with open(config_file_path, 'r') as f:
content = f.read()
print(content)
def start_server():
print("INFO: Starting ssh server.")
serverPort=None
serverConfig = globals.get_config().get("server") if globals.config_exits() else None
if serverConfig:
serverPort = serverConfig.get("port")
if serverPort:
subprocess.run(["/usr/sbin/sshd", "-D", "-e", "-p", str(serverPort)])
else:
subprocess.run(["/usr/sbin/sshd", "-D", "-e"])
if __name__ == "__main__":
load()

View File

@@ -0,0 +1,89 @@
import yaml
import subprocess
import crypt
import os
import globals
import pwd
# users:
# - username: alice
# authorized_keys: publich ssh key
# - username: bob
# password: hunter2
def user_exists(username):
try:
pwd.getpwnam(username)
return True
except KeyError:
return False
def create_user(uid, username ,password, shell="/bin/bash"):
if not shell: shell = "/bin/bash"
if not username:
return
useradd_cmd = [
'useradd',
'-m',
'-s', shell,
]
if uid: useradd_cmd.append("-u " + str(uid))
if password: useradd_cmd.append("-p" + crypt.crypt(password, crypt.mksalt(crypt.METHOD_SHA512)))
useradd_cmd.append(username)
try:
subprocess.run(useradd_cmd
, check=True)
print(f"✅ User '{username}' created with shell '{shell}' and password.")
except subprocess.CalledProcessError as e:
print(f"❌ Failed to create user '{username}': {e}")
def setup_ssh(username, public_key):
ssh_dir = f"/home/{username}/.ssh"
auth_keys = os.path.join(ssh_dir, "authorized_keys")
uid = pwd.getpwnam(username).pw_uid
gid = pwd.getpwnam(username).pw_gid
os.makedirs(ssh_dir, mode=0o700, exist_ok=True)
# Check if key already exists
key_exists = False
if os.path.exists(auth_keys):
with open(auth_keys, "r") as f:
existing_keys = f.read().splitlines()
key_exists = public_key.strip() in existing_keys
if not key_exists:
with open(auth_keys, "a") as f:
f.write(public_key.strip() + "\n")
print(f"🔐 SSH key added for '{username}'.")
else:
print(f"⚠️ SSH key already exists for '{username}'. Skipping.")
os.chmod(ssh_dir, 0o700)
os.chmod(auth_keys, 0o600)
os.chown(ssh_dir, uid, gid)
os.chown(auth_keys, uid, gid)
def load():
users = globals.get_config()["users"] if globals.config_exits() else None
if users:
for user in users:
if not user_exists(user.get('username')):
create_user(user.get('uid'), user.get('username'),user.get('password'), user.get('shell'))
if user.get('public_keys'):
for public_key in user.get('public_keys'):
setup_ssh(user.get('username'), public_key)
else:
print(f"⚠️ missing users configuration")
if __name__ == "__main__":
load()