Entities validation service
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

107 lines
2.8 KiB

from abc import ABCMeta
import tornado.web
import os
import subprocess
from datetime import datetime
import isodate
import json
import util
from entity_validator import validate
startup_timestamp = datetime.now()
class HealthHandler(tornado.web.RequestHandler, metaclass=ABCMeta):
# noinspection PyAttributeOutsideInit
def initialize(self):
self.git_version = self._load_git_version()
def _load_git_version():
v = None
# try file git-version.txt first
gitversion_file = "git-version.txt"
if os.path.exists(gitversion_file):
with open(gitversion_file) as f:
v = f.readline().strip()
# if not available, try git
if v is None:
v = subprocess.check_output(["git", "describe", "--always", "--dirty"],
except subprocess.CalledProcessError as e:
print("Checking git version lead to non-null return code ", e.returncode)
return v
def get(self):
health = dict()
health['api-version'] = 'v0'
if self.git_version is not None:
health['git-version'] = self.git_version
health['timestamp'] = isodate.datetime_isoformat(datetime.now())
health['uptime'] = isodate.duration_isoformat(datetime.now() - startup_timestamp)
self.set_header("Content-Type", "application/json")
self.write(json.dumps(health, indent=4))
class Oas3Handler(tornado.web.RequestHandler, metaclass=ABCMeta):
def get(self):
self.set_header("Content-Type", "text/plain")
# This is the proposed content type,
# but browsers like Firefox try to download instead of display the content
# self.set_header("Content-Type", "text/vnd.yml")
with open('OAS3.yml', 'r') as f:
oas3 = f.read()
class ValidateHandler(tornado.web.RequestHandler, metaclass=ABCMeta):
def post(self):
self.set_header("Content-Type", "application/json")
entity = self.request.body.decode()
validation_result = validate(json.loads(entity))
def make_app():
version_path = r"/v[0-9]"
return tornado.web.Application([
(version_path + r"/health", HealthHandler),
(version_path + r"/oas3", Oas3Handler),
(version_path + r"/validate", ValidateHandler),
def main():
port = util.load_env('PORT', 8080)
# Setup
# Teardown
print("Server stopped")
if __name__ == "__main__":