from flask import Flask
app = Flask(__name__)
@app.route('/')
def home():
return "Hello, Flask!"
app.run()pip install flask pip show flask
from flask import Flask
app = Flask(__name__)
@app.route('/')
def hello():
return "Welcome to Flask!"
if __name__ == '__main__':
app.run(debug=True)project/ ├── app.py ├── templates/ └── static/
export FLASK_APP=app.py flask run
@app.route('/user/')
def greet(name):
return f"Hello, {name}!"
if __name__ == '__main__':
app.run(debug=True)
from flask import render_template
@app.route('/')
def home():
return render_template('index.html')
<link rel="stylesheet" href="{{ url_for('static', filename='style.css') }}">
@app.route('/profile/')
def show_profile(username):
return f"Profile: {username}"
@app.route('/item/')
def item(item_id):
return f"Item number {item_id}"
url_for('show_profile', username='john')
from flask import redirect, url_for
return redirect(url_for('login'))
@app.route('/submit', methods=['POST'])
def submit():
return "Form submitted!"
@app.errorhandler(404)
def page_not_found(e):
return "Page Not Found", 404
from flask import jsonify
@app.route('/data')
def data():
return jsonify({"name": "John", "age": 30})
from flask import request
@app.route('/search')
def search():
q = request.args.get('q')
return f"Searching for {q}"
from flask import Response
return Response("Custom response", status=200)
<!-- templates/index.html -->
<h1>Hello, {{ name }}!</h1>
from flask import render_template
@app.route('/hello')
def hello():
return render_template('index.html', name='Alice')
<ul>
{% for item in items %}
<li>{{ item }}</li>
{% endfor %}
</ul>
<!-- base.html -->
<html>
<body>
{% block content %}{% endblock %}
</body>
</html>
<!-- home.html -->
{% extends 'base.html' %}
{% block content %}
<h1>Home Page</h1>
{% endblock %}
<p>{{ 'hello'|capitalize }}</p> <!-- Outputs: Hello -->
{% macro input(name, value='') %}
<input type="text" name="{{ name }}" value="{{ value }}">
{% endmacro %}
{{ input('username') }}
{{ user_input }} <!-- Escaped by default -->
{{ user_input|safe }} <!-- Render as raw HTML -->
app.run(debug=True)
<link rel="stylesheet" href="{{ url_for('static', filename='style.css') }}">
from flask import request
@app.route('/submit', methods=['POST'])
def submit():
name = request.form['name']
return f"Hello, {name}!"
@app.route('/search', methods=['GET'])
def search():
query = request.args.get('q')
return f"Search: {query}"
<form action="/submit" method="POST">
<input type="text" name="name">
<input type="submit" value="Send">
</form>
from flask_wtf import FlaskForm
from wtforms import StringField, SubmitField
class NameForm(FlaskForm):
name = StringField('Name')
submit = SubmitField('Submit')
app.config['SECRET_KEY'] = 'your_secret_key'
from wtforms.validators import DataRequired
name = StringField('Name', validators=[DataRequired()])
from werkzeug.utils import secure_filename
file = request.files['file']
filename = secure_filename(file.filename)
file.save('/path/to/uploads/' + filename)
from flask import redirect, url_for
return redirect(url_for('home'))
from flask import flash, get_flashed_messages
flash('Form submitted successfully!')
pip install flask_sqlalchemy
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy(app)
class User(db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False)
flask db init
flask db migrate -m "Initial migration"
flask db upgrade
from flask_login import LoginManager, login_user, login_required
login_manager = LoginManager(app)
@app.route('/dashboard')
@login_required
def dashboard():
return "Welcome to your dashboard"
from flask_mail import Mail, Message
mail = Mail(app)
msg = Message("Hello", recipients=["user@example.com"])
mail.send(msg)
from flask_caching import Cache
cache = Cache(app)
@cache.cached(timeout=60)
def get_data():
return expensive_computation()
from flask_restful import Api, Resource
api = Api(app)
class HelloWorld(Resource):
def get(self):
return {'hello': 'world'}
api.add_resource(HelloWorld, '/')
pip install flask-login flask-mail flask-migrate
app.config['SECRET_KEY'] = 'your_secret_key'
app.config['MAIL_SERVER'] = 'smtp.example.com'
# Example: User info stored in a database instead of memory.
# SQL: SELECT * FROM users WHERE id=1;
# NoSQL: db.users.find({id:1})
import sqlite3
conn = sqlite3.connect('database.db')
c = conn.cursor()
c.execute('CREATE TABLE IF NOT EXISTS users (id INTEGER PRIMARY KEY, name TEXT)')
conn.commit()
conn.close()
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy(app)
class User(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(80))
user = User(name='Alice')
db.session.add(user)
db.session.commit()
users = User.query.all()
for u in users:
print(u.name)
flask db migrate -m "Add user table"
flask db upgrade
# Handled internally by SQLAlchemy; no code needed here.
try:
user = User(name='Bob')
db.session.add(user)
db.session.commit()
except:
db.session.rollback()
import os
app.config['SQLALCHEMY_DATABASE_URI'] = os.getenv('DATABASE_URL')
# Verify user credentials before granting access
from flask_login import LoginManager
login_manager = LoginManager()
login_manager.init_app(app)
class User(UserMixin, db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(150), unique=True)
from flask_login import login_user
@app.route('/login', methods=['GET', 'POST'])
def login():
# validate form and login_user()
pass
from flask_login import logout_user
@app.route('/logout')
def logout():
logout_user()
return redirect(url_for('home'))
from flask_login import login_required
@app.route('/dashboard')
@login_required
def dashboard():
return "User dashboard"
login_user(user, remember=True)
@login_manager.unauthorized_handler
def unauthorized():
return redirect(url_for('login'))
from werkzeug.security import generate_password_hash, check_password_hash
hashed = generate_password_hash('mypassword')
check_password_hash(hashed, 'mypassword') # returns True
# API endpoints serve JSON data
from flask_restful import Api
api = Api(app)
from flask_restful import Resource
class HelloWorld(Resource):
def get(self):
return {'hello': 'world'}
api.add_resource(HelloWorld, '/')
from flask import request
class Item(Resource):
def post(self):
data = request.get_json()
return {'you sent': data}, 201
return {'message': 'Created'}, 201
return {'error': 'Not found'}, 404
from flask_restful import reqparse
parser = reqparse.RequestParser()
parser.add_argument('name', required=True)
args = parser.parse_args()
try:
# do something
except Exception as e:
return {'error': str(e)}, 500
from flask_httpauth import HTTPTokenAuth
auth = HTTPTokenAuth(scheme='Bearer')
api.add_resource(HelloWorld, '/api/v1/hello')
{% macro button(text) %}
<button>{{ text }}</button>
{% endmacro %}
{{ button('Click me') }}
{{ 'Hello'|lower }} <!-- outputs 'hello' -->
@app.template_global()
def greet(name):
return f"Hello, {name}"
{{ greet('Alice') }}
@app.context_processor
def inject_user():
return dict(user='Alice')
{% if user is defined %}Welcome {{ user }}{% endif %}
{{ user_input|safe }}
{% extends "base.html" %}
{% block content %}Page content here{% endblock %}
{% include "header.html" %}
{% for item in items %}
{{ loop.index }}: {{ item }}
{% endfor %}
from flask import session
session['user'] = 'Alice'
resp = make_response("Set cookie")
resp.set_cookie('username', 'Alice')
return resp
app.secret_key = 'your_secret_key'
session['cart'] = ['item1', 'item2']
session.pop('cart', None)
from datetime import timedelta
app.permanent_session_lifetime = timedelta(days=7)
session.permanent = True
# Use HTTPS and SECRET_KEY for secure sessions
from flask_session import Session
app.config['SESSION_TYPE'] = 'filesystem'
Session(app)
<p>Logged in as: {{ session['user'] }}</p>
@app.route('/logout')
def logout():
session.clear()
return redirect(url_for('home'))
# Store user info, posts, etc.
# Example: SQLite (SQL), MongoDB (NoSQL)
import sqlite3
conn = sqlite3.connect('database.db')
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy(app)
class User(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(50))
flask db init
flask db migrate
flask db upgrade
user = User(name='Alice')
db.session.add(user)
db.session.commit()
users = User.query.filter_by(name='Alice').all()
class Post(db.Model):
user_id = db.Column(db.Integer, db.ForeignKey('user.id'))
user = db.relationship('User', backref='posts')
try:
db.session.add(new_user)
db.session.commit()
except:
db.session.rollback()
<form method="POST"></form>
from flask_wtf import FlaskForm
from wtforms import StringField, SubmitField
class LoginForm(FlaskForm):
username = StringField('Username', validators=[DataRequired()])
submit = SubmitField('Login')
<form method="POST">
{{ form.csrf_token }}
{{ form.username.label }} {{ form.username() }}
{{ form.submit() }}
</form>
form = LoginForm()
if form.validate_on_submit():
username = form.username.data
def validate_username(form, field):
if ' ' in field.data:
raise ValidationError('No spaces allowed')
from flask_wtf.file import FileField, FileAllowed
class UploadForm(FlaskForm):
file = FileField('Upload', validators=[FileAllowed(['jpg', 'png'])])
from flask import flash
flash('Login successful')
app.secret_key = 'your_secret_key'
file = request.files['file']
file.save('/path/to/save')
app.config['UPLOAD_FOLDER'] = 'uploads/'
ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg', 'gif'}
def allowed_file(filename):
return '.' in filename and filename.rsplit('.',1)[1].lower() in ALLOWED_EXTENSIONS
from werkzeug.utils import secure_filename
filename = secure_filename(file.filename)
file.save(os.path.join(app.config['UPLOAD_FOLDER'], filename))
<form method="POST" enctype="multipart/form-data">
<input type="file" name="file">
<input type="submit" value="Upload">
</form>
from flask import send_from_directory
@app.route('/download/<filename>')
def download(filename):
return send_from_directory(app.config['UPLOAD_FOLDER'], filename)
app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16MB
# Validate and sanitize uploads
files = request.files.getlist('files')
for f in files:
f.save(os.path.join(app.config['UPLOAD_FOLDER'], f.filename))
# Modular app components
from flask import Blueprint
bp = Blueprint('bp', __name__)
@bp.route('/')
def index():
return "Hello from blueprint"
app.register_blueprint(bp, url_prefix='/bp')
# bp_auth.py, bp_blog.py, etc.
bp = Blueprint('bp', __name__, template_folder='templates', static_folder='static')
url_for('bp.index')
@bp.errorhandler(404)
def not_found(e):
return "Not Found", 404
@bp.before_request
def before():
pass
@bp.context_processor
def inject_data():
return dict(bp_name='My Blueprint')
# Host app on web servers
app.run(debug=False)
gunicorn app:app
web: gunicorn app:app
eb init
eb create
import os
SECRET_KEY = os.environ.get('SECRET_KEY')
# Nginx proxy configuration example
import logging
logging.basicConfig(level=logging.INFO)
gunicorn -w 4 app:app
# Verify who the user is before granting access
from flask_login import LoginManager
login_manager = LoginManager()
login_manager.init_app(app)
class User(UserMixin, db.Model):
id = db.Column(db.Integer, primary_key=True)
# Flask-Login needs is_authenticated, etc. from UserMixin
from flask_login import login_user
@app.route('/login', methods=['GET', 'POST'])
def login():
form = LoginForm()
if form.validate_on_submit():
user = User.query.filter_by(username=form.username.data).first()
if user and user.check_password(form.password.data):
login_user(user)
return redirect(url_for('dashboard'))
return render_template('login.html', form=form)
from flask_login import login_required
@app.route('/dashboard')
@login_required
def dashboard():
return "Welcome to your dashboard"
from flask_login import logout_user
@app.route('/logout')
@login_required
def logout():
logout_user()
return redirect(url_for('home'))
from werkzeug.security import generate_password_hash, check_password_hash
hashed = generate_password_hash('mypassword')
check_password_hash(hashed, 'mypassword') # returns True
login_user(user, remember=True)
@login_manager.unauthorized_handler
def unauthorized():
return redirect(url_for('login'))
# REST stands for Representational State Transfer
from flask_restful import Api, Resource
api = Api(app)
class HelloResource(Resource):
def get(self):
return {'message': 'Hello, World!'}
api.add_resource(HelloResource, '/hello')
from flask import request
class EchoResource(Resource):
def post(self):
data = request.get_json()
return {'you_sent': data}
@app.route('/search')
def search():
term = request.args.get('term')
return {'result': f'Searching for {term}'}
from flask_restful import abort
def get_item(item_id):
if not item_exists(item_id):
abort(404, message="Item not found")
from flask_httpauth import HTTPBasicAuth
auth = HTTPBasicAuth()
@auth.verify_password
def verify(username, password):
# Verify credentials here
return True
page = int(request.args.get('page', 1))
per_page = int(request.args.get('per_page', 10))
items = Item.query.paginate(page, per_page, False)
api.add_resource(HelloResource, '/v1/hello')
# Automated tests save time and errors
import unittest
class MyTest(unittest.TestCase):
def test_example(self):
self.assertEqual(2 + 2, 4)
with app.test_client() as client:
response = client.get('/')
assert response.status_code == 200
def test_home():
with app.test_client() as client:
response = client.get('/')
assert b"Welcome" in response.data
class MyTest(unittest.TestCase):
def setUp(self):
self.client = app.test_client()
def tearDown(self):
pass
response = client.post('/login', data={'username': 'test', 'password': '1234'})
assert response.status_code == 200
from unittest.mock import patch
@patch('app.send_email')
def test_email(mock_send):
mock_send.return_value = True
# Example GitHub Actions workflow file
coverage run -m unittest discover
coverage report
# Examples: Flask-Login, Flask-Migrate, Flask-Mail
pip install flask-login flask-migrate flask-mail
from flask_mail import Mail, Message
mail = Mail(app)
flask db migrate
flask db upgrade
from flask_caching import Cache
cache = Cache(app)
from flask_admin import Admin
admin = Admin(app)
from flask_wtf import FlaskForm
from flask_restful import Api
api = Api(app)
from flask_security import Security
from flask import signals
def handler(sender):
print("Signal received")
signals.request_started.connect(handler)
@app.cli.command()
def hello():
print("Hello from custom CLI")
from flask_socketio import SocketIO
socketio = SocketIO(app)
from celery import Celery
celery = Celery(app.name)
with app.app_context():
# do something
from flask import request_started
def log_request(sender, **extra):
print('Request started')
request_started.connect(log_request, app)
from flask_cors import CORS
CORS(app)
# Dockerfile example
FROM python:3.9
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY . .
CMD ["gunicorn", "app:app"]
import cProfile
profiler = cProfile.Profile()
profiler.enable()
# your code
profiler.disable()
profiler.print_stats()
# Examples: SQLite, PostgreSQL, MySQL
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy(app)
class User(db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True)
user = User(username='alice')
db.session.add(user)
db.session.commit()
flask db init
flask db migrate -m "Initial migration"
flask db upgrade
users = User.query.filter_by(username='alice').all()
class Post(db.Model):
user_id = db.Column(db.Integer, db.ForeignKey('user.id'))
user = db.relationship('User', backref='posts')
try:
db.session.commit()
except:
db.session.rollback()
# Use eager loading to reduce queries
posts = Post.query.options(db.joinedload('user')).all()
# Allows splitting app into reusable parts
from flask import Blueprint
bp = Blueprint('bp', __name__)
@bp.route('/hello')
def hello():
return "Hello from Blueprint"
app.register_blueprint(bp, url_prefix='/bp')
bp = Blueprint('bp', __name__, template_folder='templates', static_folder='static')
@bp.route('/dashboard')
def dashboard():
return "Blueprint dashboard"
api_bp = Blueprint('api', __name__, url_prefix='/api')
@bp.errorhandler(404)
def not_found(e):
return "Blueprint 404 Error", 404
@bp.before_request
def before():
print("Before blueprint request")
with app.test_client() as client:
response = client.get('/bp/hello')
assert response.status_code == 200
from flask_wtf import FlaskForm
from wtforms import StringField, PasswordField
class LoginForm(FlaskForm):
username = StringField('Username')
password = PasswordField('Password')
{{ form.csrf_token }}
{{ form.username.label }} {{ form.username() }}
if form.validate_on_submit():
# process data
@app.route('/login', methods=['GET', 'POST'])
def login():
form = LoginForm()
if form.validate_on_submit():
# login logic
return redirect('/')
return render_template('login.html', form=form)
from wtforms.validators import ValidationError
def validate_username(form, field):
if ' ' in field.data:
raise ValidationError('No spaces allowed')
from flask import flash
flash('Login successful')
from flask_wtf.file import FileField, FileAllowed
class UploadForm(FlaskForm):
photo = FileField('Photo', validators=[FileAllowed(['jpg', 'png'])])
{{ form.csrf_token }}
# Examples: Heroku git push, AWS Elastic Beanstalk
gunicorn app:app
export SECRET_KEY='mysecretkey'
# Example Nginx config snippet
proxy_pass http://127.0.0.1:8000;
# Certbot command example
certbot --nginx -d example.com
import logging
logging.basicConfig(filename='app.log', level=logging.INFO)
gunicorn -w 4 app:app
# Use PostgreSQL or managed DB services
# Configure Nginx to serve /static path
# Follow best practices to avoid attacks
{{ user_input | e }}
{{ form.csrf_token }}
# SQLAlchemy safely escapes queries
User.query.filter_by(username=input_username).first()
app.config['SESSION_COOKIE_HTTPONLY'] = True
app.config['SESSION_COOKIE_SECURE'] = True
from werkzeug.security import generate_password_hash
hashed = generate_password_hash('mypassword')
from flask_limiter import Limiter
limiter = Limiter(app)
# Redirect HTTP to HTTPS
from flask_talisman import Talisman
Talisman(app)
with open('sample.txt', 'r') as file:
content = file.read()
print(content)
with open('output.txt', 'w') as file:
file.write("Hello, Flask!")
from flask import request
@app.route('/upload', methods=['POST'])
def upload():
f = request.files['file']
f.save('uploads/' + f.filename)
return "File saved!"
from flask import send_from_directory
@app.route('/uploads/<filename>')
def uploaded_file(filename):
return send_from_directory('uploads', filename)
import os
safe_path = os.path.join('uploads', filename)
app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16MB
ALLOWED_EXTENSIONS = {'png', 'jpg', 'txt'}
def allowed_file(filename):
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
import os
if os.path.exists('uploads/old_file.txt'):
os.remove('uploads/old_file.txt')
from flask import Response
def generate():
with open('largefile.txt') as f:
for line in f:
yield line
@app.route('/stream')
def stream():
return Response(generate())
from flask import session
session['username'] = 'Alice'
session['email'] = 'alice@example.com'
email = session.get('email')
session.pop('username', None)
session.clear()
from flask import make_response
resp = make_response("Cookie set")
resp.set_cookie('theme', 'dark')
return resp
theme = request.cookies.get('theme')
app.config['SECRET_KEY'] = 'your-secret-key'
from datetime import timedelta
app.permanent_session_lifetime = timedelta(days=7)
from flask_login import LoginManager
login_manager = LoginManager(app)
app.config['SESSION_COOKIE_SECURE'] = True
# REST APIs are stateless and use JSON
from flask import jsonify
@app.route('/api/data', methods=['GET'])
def get_data():
return jsonify({'message': 'Hello API'})
@app.route('/api/data', methods=['POST'])
def post_data():
data = request.get_json()
return jsonify(data), 201
from flask_restful import Resource, Api
api = Api(app)
class Hello(Resource):
def get(self):
return {'hello': 'world'}
api.add_resource(Hello, '/hello')
from flask import abort
if not valid:
abort(400, 'Invalid request')
from flask_httpauth import HTTPTokenAuth
auth = HTTPTokenAuth('Bearer')
page = request.args.get('page', 1, type=int)
/api/v1/resource
/api/v2/resource
with app.test_client() as client:
response = client.get('/api/data')
assert response.status_code == 200
app.run(debug=True)
import logging
logging.basicConfig(filename='app.log', level=logging.ERROR)
import pdb; pdb.set_trace()
try:
risky_code()
except Exception as e:
app.logger.error(e)
flask shell
from flask_debugtoolbar import DebugToolbarExtension
toolbar = DebugToolbarExtension(app)
import cProfile
cProfile.run('app.run()')
import unittest
class MyTest(unittest.TestCase):
def test_home(self):
tester = app.test_client()
response = tester.get('/')
self.assertEqual(response.status_code, 200)
# Check logs in /var/log or cloud platform
@app.context_processor
def inject_user():
return dict(user='Alice')
def reverse_string(s):
return s[::-1]
app.jinja_env.filters['reverse'] = reverse_string
from flask.signals import request_started
def log_request(sender, **extra):
print("Request started")
request_started.connect(log_request)
from flask_caching import Cache
cache = Cache(app)
@cache.cached(timeout=60)
def get_data():
# expensive call
pass
@app.shell_context_processor
def make_shell_context():
return dict(app=app, db=db, User=User)
app.config.from_object('config.DevelopmentConfig')
class Middleware:
def __init__(self, app):
self.app = app
def __call__(self, environ, start_response):
# modify environ or headers
return self.app(environ, start_response)
app.wsgi_app = Middleware(app.wsgi_app)
from flask.signals import request_finished
def log_response(sender, response, **extra):
print("Response sent")
request_finished.connect(log_response)
{% raw %}
{% extends "base.html" %}
{% block content %}
Hello
{% endblock %}
{% endraw %}
from flask import Blueprint
bp = Blueprint('bp', __name__)
@bp.route('/hello')
def hello():
return "Hello from Blueprint"
app.register_blueprint(bp, url_prefix='/bp')
bp = Blueprint('bp', __name__, template_folder='templates')
app.register_blueprint(bp, url_prefix='/admin')
bp = Blueprint('bp', __name__, static_folder='static')
def create_app():
app = Flask(__name__)
app.register_blueprint(bp)
return app
@bp.errorhandler(404)
def not_found(e):
return "Not found in blueprint", 404
@bp.before_request
def before():
print("Before blueprint request")
{% raw %}
{% extends "base.html" %}
{% block content %}
Blueprint content here
{% endblock %}
{% endraw %}
from flask_wtf import FlaskForm
from wtforms import StringField, SubmitField
from wtforms.validators import DataRequired
class NameForm(FlaskForm):
name = StringField('Name', validators=[DataRequired()])
submit = SubmitField('Submit')
<form method="POST">
{{ form.hidden_tag() }}
{{ form.name.label }} {{ form.name() }}
{{ form.submit() }}
</form>
@app.route('/submit', methods=['GET', 'POST'])
def submit():
form = NameForm()
if form.validate_on_submit():
name = form.name.data
return f"Hello, {name}!"
return render_template('form.html', form=form)
app.config['SECRET_KEY'] = 'your-secret-key'
from wtforms.validators import ValidationError
def must_be_hello(form, field):
if field.data != 'hello':
raise ValidationError('Must be hello!')
from flask_wtf.file import FileField, FileAllowed
class UploadForm(FlaskForm):
photo = FileField('Photo', validators=[FileAllowed(['jpg', 'png'])])
{{ form.name(class="form-control", placeholder="Enter name") }}
from flask import flash
flash('Form submitted successfully!')
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy()
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///site.db'
class User(db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(20), unique=True, nullable=False)
with app.app_context():
db.create_all()
user = User(username='Alice')
db.session.add(user)
db.session.commit()
users = User.query.all()
user = User.query.filter_by(username='Alice').first()
user.username = 'Bob'
db.session.commit()
db.session.delete(user)
db.session.commit()
class Post(db.Model):
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, db.ForeignKey('user.id'))
user = db.relationship('User', backref='posts')
# Avoid manual schema changes
pip install flask-migrate
from flask_migrate import Migrate
migrate = Migrate(app, db)
flask db init
flask db migrate -m "Initial migration"
flask db upgrade
flask db downgrade
# Modify model.py, then:
flask db migrate -m "Add new field"
def upgrade():
pass
def downgrade():
pass
# Use descriptive commit messages
app.run(debug=False)
pip freeze > requirements.txt
gunicorn app:app
export FLASK_ENV=production
export SECRET_KEY='your-secret-key'
web: gunicorn app:app
FROM python:3.9
COPY . /app
WORKDIR /app
RUN pip install -r requirements.txt
CMD ["gunicorn", "app:app"]
# Example nginx config snippet
location / {
proxy_pass http://127.0.0.1:8000;
}
tail -f /var/log/myapp.log
gunicorn -w 4 app:app
# Use tools like Flask-Talisman for HTTPS
from flask_login import LoginManager
login_manager = LoginManager()
login_manager.init_app(app)
@login_manager.user_loader
def load_user(user_id):
return User.query.get(int(user_id))
from flask_login import login_user, logout_user
@app.route('/login', methods=['GET', 'POST'])
def login():
# Authenticate and login user
login_user(user)
return redirect('/dashboard')
@app.route('/logout')
def logout():
logout_user()
return redirect('/')
from flask_login import login_required
@app.route('/dashboard')
@login_required
def dashboard():
return "Welcome to your dashboard"
from werkzeug.security import generate_password_hash, check_password_hash
hashed_password = generate_password_hash('mypassword')
login_user(user, remember=True)
def admin_required(f):
@wraps(f)
def decorated(*args, **kwargs):
if not current_user.is_admin:
abort(403)
return f(*args, **kwargs)
return decorated
from flask_dance.contrib.google import make_google_blueprint
google_bp = make_google_blueprint(client_id="xxx", client_secret="yyy")
app.register_blueprint(google_bp, url_prefix="/login")
app.config['REMEMBER_COOKIE_DURATION'] = timedelta(days=7)
GET /users # Retrieve users
POST /users # Create user
pip install flask-restful
from flask_restful import Resource
class UserResource(Resource):
def get(self):
return {"users": ["Alice", "Bob"]}
from flask_restful import Api
api = Api(app)
api.add_resource(UserResource, '/users')
def post(self):
data = request.get_json()
# create new user
return {"message": "User created"}, 201
from flask_restful import reqparse
parser = reqparse.RequestParser()
parser.add_argument('name', required=True)
args = parser.parse_args()
from flask_marshmallow import Marshmallow
ma = Marshmallow(app)
class UserSchema(ma.Schema):
class Meta:
fields = ('id', 'username')
from flask_restful import abort
if not user:
abort(404, message="User not found")
api.add_resource(UserResource, '/api/v1/users')
# Always write tests for your app
import pytest
@pytest.fixture
def client():
app.config['TESTING'] = True
with app.test_client() as client:
yield client
def test_home(client):
rv = client.get('/')
assert rv.status_code == 200
assert b"Welcome" in rv.data
def test_submit_form(client):
rv = client.post('/submit', data={'name': 'Alice'})
assert b"Hello, Alice" in rv.data
from unittest.mock import patch
@patch('requests.get')
def test_api(mock_get, client):
mock_get.return_value.json.return_value = {'key': 'value'}
# test your endpoint
pip install flask-debugtoolbar
import pdb; pdb.set_trace()
import logging
logging.basicConfig(level=logging.DEBUG)
def test_full_flow(client):
# register, login, access dashboard
pass
# WebSocket keeps connection open
pip install flask-socketio
from flask_socketio import SocketIO
socketio = SocketIO(app)
@socketio.on('message')
def handle_message(msg):
print('Received:', msg)
socketio.send('Reply: ' + msg)
socketio.emit('update', data, broadcast=True)
join_room('room1')
leave_room('room1')
<script src="//cdnjs.cloudflare.com/ajax/libs/socket.io/4.0.1/socket.io.min.js"></script>
<script>
var socket = io();
socket.on('message', function(msg) {
console.log(msg);
});
</script>
pip install eventlet
socketio.run(app, host='0.0.0.0', port=5000)
# Real-time notifications, live data feeds
app.config['DEBUG'] = True
class Config:
DEBUG = False
class DevConfig(Config):
DEBUG = True
app.config.from_pyfile('config.py')
import os
app.config['SECRET_KEY'] = os.environ.get('SECRET_KEY')
pip install flask-environments
pip install python-dotenv
from dotenv import load_dotenv
load_dotenv()
if app.config['DEBUG']:
# Dev-only settings
import logging
logging.basicConfig(level=logging.INFO)
# Use .env and config.py wisely
# Install with:
pip install flask_sqlalchemy
from flask_sqlalchemy import SQLAlchemy
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///test.db'
db = SQLAlchemy(app)
class User(db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False)
from flask_migrate import Migrate
migrate = Migrate(app, db)
flask db init
flask db migrate
flask db upgrade
user = User(username='Alice')
db.session.add(user)
db.session.commit()
users = User.query.all()
class Post(db.Model):
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, db.ForeignKey('user.id'))
user = db.relationship('User', back_populates='posts')
user.username = 'Bob'
db.session.commit()
db.session.delete(user)
db.session.commit()
users = User.query.filter(User.username.like('A%')).all()
try:
db.session.add(user)
db.session.commit()
except:
db.session.rollback()
from flask_wtf import FlaskForm
from wtforms import StringField, PasswordField
from wtforms.validators import DataRequired
class RegistrationForm(FlaskForm):
username = StringField('Username', validators=[DataRequired()])
password = PasswordField('Password', validators=[DataRequired()])
@app.route('/register', methods=['GET', 'POST'])
def register():
form = RegistrationForm()
if form.validate_on_submit():
# Save user
return redirect('/')
return render_template('register.html', form=form)
from werkzeug.security import generate_password_hash, check_password_hash
hashed_pw = generate_password_hash(form.password.data)
from flask_login import login_user
user = User.query.filter_by(username=form.username.data).first()
if user and check_password_hash(user.password, form.password.data):
login_user(user)
@app.route('/profile')
@login_required
def profile():
return render_template('profile.html', user=current_user)
@app.route('/profile/edit', methods=['GET', 'POST'])
@login_required
def edit_profile():
form = ProfileForm(obj=current_user)
if form.validate_on_submit():
form.populate_obj(current_user)
db.session.commit()
flash('Profile updated')
return redirect('/profile')
from werkzeug.utils import secure_filename
file = request.files['photo']
filename = secure_filename(file.filename)
file.save(os.path.join('static/uploads', filename))
# Use itsdangerous for token generation
from itsdangerous import URLSafeTimedSerializer
if current_user.role != 'admin':
abort(403)
from flask_mail import Mail, Message
mail = Mail(app)
app.config.update(
MAIL_SERVER='smtp.gmail.com',
MAIL_PORT=587,
MAIL_USE_TLS=True,
MAIL_USERNAME='your_email@gmail.com',
MAIL_PASSWORD='your_password'
)
msg = Message("Hello", sender="you@gmail.com", recipients=["friend@gmail.com"])
msg.body = "This is a test email"
mail.send(msg)
msg.html = "<b>Bold text</b> and <i>italics</i>"
with app.open_resource("file.pdf") as fp:
msg.attach("file.pdf", "application/pdf", fp.read())
html = render_template('email.html', name=user.name)
msg.html = html
# Generate token and send URL to user
# Send tokenized reset email similar to confirmation
try:
mail.send(msg)
except Exception as e:
app.logger.error(f"Email error: {e}")
file = request.files['file']
file.save('/path/to/save/' + file.filename)
ALLOWED_EXTENSIONS = {'png', 'jpg', 'pdf'}
def allowed_file(filename):
return '.' in filename and \
filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
from werkzeug.utils import secure_filename
filename = secure_filename(file.filename)
UPLOAD_FOLDER = '/uploads'
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
file.save(os.path.join(app.config['UPLOAD_FOLDER'], filename))
files = request.files.getlist('files')
for f in files:
f.save(os.path.join(app.config['UPLOAD_FOLDER'], secure_filename(f.filename)))
from flask import send_from_directory
@app.route('/download/<filename>')
def download(filename):
return send_from_directory(app.config['UPLOAD_FOLDER'], filename)
app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16 MB
@app.errorhandler(413)
def too_large(e):
return "File too large", 413
# Always validate and sanitize inputs
app.config['DEBUG'] = False
gunicorn app:app
# Nginx example
location / {
proxy_pass http://127.0.0.1:8000;
}
export SECRET_KEY='your_secret'
sudo certbot --nginx -d yourdomain.com
app.config['SQLALCHEMY_DATABASE_URI'] = os.environ.get('DATABASE_URL')
import logging
logging.basicConfig(filename='app.log', level=logging.WARNING)
gunicorn -w 4 app:app
# Regularly update packages
pip install --upgrade -r requirements.txt
# REST example: GET, POST, PUT, DELETE
pip install flask-restful
from flask_restful import Api, Resource
api = Api(app)
class Hello(Resource):
def get(self):
return {'message': 'Hello World'}
api.add_resource(Hello, '/hello')
from flask import request
class Item(Resource):
def post(self):
data = request.get_json()
return {'you sent': data}, 201
class User(Resource):
def get(self, user_id):
return {'user_id': user_id}
api.add_resource(User, '/user/<int:user_id>')
from flask_restful import abort
if not user:
abort(404, message="User not found")
from flask_httpauth import HTTPBasicAuth
auth = HTTPBasicAuth()
page = int(request.args.get('page', 1))
items = Model.query.paginate(page, 10)
api.add_resource(Hello, '/v1/hello')
# Unlike HTTP, WebSockets stay open for push updates.
pip install flask-socketio
from flask_socketio import SocketIO
socketio = SocketIO(app)
@socketio.on('message')
def handle_message(msg):
print('Message: ' + msg)
socketio.emit('response', {'data': 'Hello clients!'})
@socketio.on('connect')
def handle_connect():
print('Client connected')
socketio.emit('update', {'data': 'Broadcast message'}, broadcast=True)
join_room('room1')
leave_room('room1')
var socket = io();
socket.on('response', function(data) { console.log(data); });
socketio.run(app, host='0.0.0.0', port=5000)
# Automated tests improve code quality.
pip install pytest
def test_add():
assert 1 + 1 == 2
def test_home():
tester = app.test_client()
response = tester.get('/')
assert response.status_code == 200
@pytest.fixture
def client():
with app.test_client() as client:
yield client
from unittest.mock import patch
@patch('module.function')
def test_mock(mock_func):
mock_func.return_value = True
app.run(debug=True)
# Use verbose pytest output
pytest -v
# .github/workflows/python-app.yml example
# Cache static and dynamic content.
pip install Flask-Caching
from flask_caching import Cache
cache = Cache(app, config={'CACHE_TYPE': 'simple'})
@cache.cached(timeout=60)
@app.route('/data')
def data():
return expensive_function()
@cache.memoize(50)
def get_user(user_id):
# Expensive DB call
pass
cache.delete_memoized(get_user, user_id)
cache = Cache(app, config={'CACHE_TYPE': 'redis', 'CACHE_REDIS_URL': 'redis://localhost:6379/0'})
# Jinja2 extensions support caching blocks
response.headers['Cache-Control'] = 'public, max-age=3600'
# Enable debug logging for cache
pip install Flask-Migrate
from flask_login import LoginManager
login_manager = LoginManager(app)
from flask_wtf import FlaskForm
from flask_mail import Mail
mail = Mail(app)
from flask_socketio import SocketIO
socketio = SocketIO(app)
from flask_caching import Cache
cache = Cache(app)
from flask_admin import Admin
admin = Admin(app)
from flask_babel import Babel
babel = Babel(app)
from flask_uploads import UploadSet, configure_uploads