Coverage reporting server and plugin for Drone CI.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

179 lines
5.9 KiB

  1. #!/usr/bin/env python3
  2. from flask import Flask, abort, json, render_template, request
  3. import flask
  4. from flask_sqlalchemy import SQLAlchemy
  5. import datetime
  6. import os
  7. MIME_TYPE_SVG = 'image/svg+xml;charset=utf-8'
  8. colormap = {
  9. 'green': '#97ca00',
  10. 'orange': '#fe7d37',
  11. 'red': '#e05d44',
  12. }
  13. app = Flask(__name__)
  14. app.config['SQLALCHEMY_DATABASE_URI'] = os.environ.get('DRONECOV_DB_URI', 'sqlite:///./dronecov.db')
  15. app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
  16. db = SQLAlchemy(app)
  17. class CoverageInfo(db.Model):
  18. id = db.Column(db.Integer, primary_key=True)
  19. username = db.Column(db.String(255), nullable=False)
  20. reponame = db.Column(db.String(255), nullable=False)
  21. branch = db.Column(db.String(255), nullable=False)
  22. build_id = db.Column(db.String(8), nullable=False)
  23. coverage = db.Column(db.Float(), nullable=False)
  24. created_at = db.Column(db.DateTime(), nullable=False, default=datetime.datetime.utcnow)
  25. def __repr__(self):
  26. return '<Coverage %r/%r/%r@%r = %r>' % (self.username, self.reponame, self.branch, self.build_id,self.coverage)
  27. class AccessToken(db.Model):
  28. id = db.Column(db.Integer, primary_key=True)
  29. token = db.Column(db.String(32), unique=True, nullable=False)
  30. name = db.Column(db.String(255), nullable=False)
  31. username = db.Column(db.String(255), nullable=False)
  32. created_at = db.Column(db.DateTime(), nullable=False, default=datetime.datetime.utcnow)
  33. # Test support:
  34. # For the test DB, create all tables without asking
  35. if app.config['SQLALCHEMY_DATABASE_URI'] == 'sqlite:///./tests/tmp.db':
  36. db.create_all()
  37. class UnauthorizedException(Exception):
  38. pass
  39. class TokenUnauthorizedException(Exception):
  40. pass
  41. def coverage_precision(cov):
  42. if cov >= 99.95:
  43. return "100"
  44. if cov >= 9.995:
  45. # return "%d.%01d" % (cov, ((cov-int(cov))*10))
  46. return "%.1f" % cov
  47. return "%.2f" % cov
  48. def format_coverage(cov):
  49. return coverage_precision(cov) + "&#8201;%"
  50. def render_color(cov: float, threshold_warn: float, threshold_error: float) -> str:
  51. if cov <= threshold_error:
  52. return "red"
  53. if cov <= threshold_warn:
  54. return "orange"
  55. return "green"
  56. @app.errorhandler(UnauthorizedException)
  57. def handle_unauthorized(error):
  58. return ("Unauthorized", 401, {})
  59. @app.errorhandler(TokenUnauthorizedException)
  60. def handle_unauthorized(error):
  61. return ("Forbidden", 403, {})
  62. @app.route('/<user>/<repo>/<branch>/coverage.svg')
  63. def get_coverage_svg(user: str, repo: str, branch: str):
  64. try:
  65. threshold_error = float(request.args.get('error', 5))
  66. threshold_warn = float(request.args.get('warn', 80))
  67. except ValueError as e:
  68. return (str(e), 400, {})
  69. cov = db.session.query(CoverageInfo).filter_by(
  70. username=user,
  71. reponame=repo,
  72. branch=branch).order_by(CoverageInfo.created_at.desc()).first()
  73. if cov is not None:
  74. if app.debug and 'cov' in request.args:
  75. cov.coverage = float(request.args.get('cov'))
  76. coverage_string = format_coverage(cov.coverage)
  77. color = colormap[render_color(cov.coverage, threshold_warn, threshold_error)]
  78. else:
  79. coverage_string = 'N/A'
  80. color = colormap['red']
  81. return (render_template('badge-template.svg',
  82. w1=60, w2=54, pad=4,
  83. coverage=coverage_string,
  84. color=color
  85. ), 200, {
  86. 'Content-Type': MIME_TYPE_SVG,
  87. })
  88. AUTH_PREFIX = 'Bearer '
  89. def validate_coverage_report(user: str, repo: str, branch: str, cov_json) -> CoverageInfo:
  90. cov_total = float(cov_json.get('coverage_total'))
  91. build_number = int(cov_json.get('build_number'))
  92. return CoverageInfo(coverage=cov_total,
  93. build_id=build_number,
  94. username=user,
  95. reponame=repo,
  96. branch=branch)
  97. def token_can_access(token: str, user: str, repo: str):
  98. """Check that token can access "user/repo", otherwise throw an exception."""
  99. tk = db.session.query(AccessToken).filter_by(username=user, token=token).first()
  100. if tk is None:
  101. raise TokenUnauthorizedException()
  102. def check_authorization(user: str, repo: str):
  103. auth = request.headers.get('Authorization', '')
  104. token = auth[len(AUTH_PREFIX):]
  105. if not (auth.startswith(AUTH_PREFIX) and len(token) == 32):
  106. raise UnauthorizedException()
  107. return token_can_access(token, user, repo)
  108. @app.route('/<user>/<repo>/<branch>/coverage', methods=['POST'])
  109. def update_coverage(user: str, repo: str, branch: str):
  110. check_authorization(user, repo)
  111. try:
  112. cov = validate_coverage_report(user, repo, branch, request.json)
  113. except (TypeError, ValueError) as e:
  114. return (str(e), 400, {})
  115. db.session.add(cov)
  116. db.session.commit()
  117. return ('OK', 201, None)
  118. def generate_token() -> str:
  119. import random
  120. alphabet = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
  121. return ''.join(random.choice(alphabet) for _ in range(32))
  122. if __name__ == '__main__':
  123. import sys
  124. if sys.argv[1] == 'init':
  125. db.create_all()
  126. print("DB created.")
  127. elif sys.argv[1] in ['token', 'token-batch']:
  128. user_repo = sys.argv[2]
  129. if '/' not in user_repo:
  130. user_repo += '/*'
  131. user, repo = user_repo.split('/')
  132. if repo not in ['', '*']:
  133. print("warning: repo name is ignored, token is valid for all repos belonging to " + user)
  134. t = AccessToken(username = user,
  135. name = sys.argv[3])
  136. t.token = generate_token()
  137. db.session.add(t)
  138. db.session.commit()
  139. if sys.argv[1] == 'token':
  140. print('Name: %s' % (t.name))
  141. print('Access Token: %s' % (t.token))
  142. print('Valid repos: %s/*' % (t.username))
  143. else:
  144. # Batch mode, print token and nothing else
  145. print(t.token)