274 lines
8.7 KiB
Python
274 lines
8.7 KiB
Python
import sqlite3
|
|
from datetime import datetime, timedelta, timezone
|
|
from zoneinfo import ZoneInfo
|
|
from flask import (
|
|
Flask,
|
|
request,
|
|
render_template,
|
|
jsonify
|
|
)
|
|
import logging
|
|
|
|
LEGACY_TIMEZONE = ZoneInfo("America/New_York")
|
|
|
|
app = Flask(__name__)
|
|
|
|
DATABASE = "database/timeclock.db"
|
|
|
|
def is_timezone_aware(dt):
|
|
return dt.tzinfo is not None and dt.tzinfo.utcoffset(dt) is not None
|
|
|
|
def make_timezone_aware(dt, tzstr):
|
|
if tzstr:
|
|
tzval = datetime.strptime(tzstr.replace(":", ""), "%z").tzinfo
|
|
else:
|
|
tzval = LEGACY_TIMEZONE
|
|
if not is_timezone_aware(dt):
|
|
return dt.replace(tzinfo=tzval)
|
|
else:
|
|
return dt
|
|
|
|
def get_db_connection():
|
|
conn = sqlite3.connect(DATABASE)
|
|
conn.row_factory = sqlite3.Row
|
|
return conn
|
|
|
|
def initialize_database():
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
cursor.execute("""
|
|
CREATE TABLE IF NOT EXISTS users (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
name TEXT NOT NULL
|
|
)
|
|
""")
|
|
cursor.execute("""
|
|
CREATE TABLE IF NOT EXISTS entries (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
user_id INTEGER NOT NULL,
|
|
entrytype TEXT NOT NULL CHECK(entrytype IN ('in', 'out')),
|
|
ts TEXT NOT NULL,
|
|
paid BOOLEAN NOT NULL DEFAULT FALSE,
|
|
comments TEXT,
|
|
FOREIGN KEY(user_id) REFERENCES users(id)
|
|
)
|
|
""")
|
|
conn.commit()
|
|
cursor.execute("SELECT COUNT(*) AS count FROM users")
|
|
result = cursor.fetchone()
|
|
if result["count"] == 0:
|
|
cursor.execute("INSERT INTO users (name) VALUES ('Alice')")
|
|
cursor.execute("INSERT INTO users (name) VALUES ('Bob')")
|
|
cursor.execute("INSERT INTO users (name) VALUES ('Charlie')")
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
def parse_iso_datetime(value):
|
|
normalized = value.replace(" ", "T")
|
|
dt = datetime.fromisoformat(normalized)
|
|
return dt
|
|
|
|
def est_now():
|
|
return datetime.now(ZoneInfo("America/New_York"))
|
|
|
|
def pay_hours(user_id, begin_date, end_date):
|
|
conn = get_db_connection()
|
|
conn.set_trace_callback(logging.error)
|
|
cursor = conn.cursor()
|
|
begin_dt = parse_iso_datetime(begin_date)
|
|
end_dt = parse_iso_datetime(end_date)
|
|
cursor.execute("""
|
|
UPDATE entries
|
|
SET paid = TRUE
|
|
WHERE user_id = ?
|
|
AND datetime(ts) BETWEEN datetime(?) AND datetime(?)
|
|
""", (user_id, begin_dt.isoformat(), end_dt.isoformat()))
|
|
rows = cursor.fetchall()
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
def create_entry(user_id, entry_type, client_timestamp=None, comments=None, client_timezone=None):
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
if client_timestamp:
|
|
timestamp = make_timezone_aware(parse_iso_datetime(client_timestamp), client_timezone)
|
|
else:
|
|
timestamp = est_now()
|
|
if comments:
|
|
cursor.execute("""
|
|
INSERT INTO entries (user_id, entrytype, ts, paid, comments)
|
|
VALUES (?, ?, ?, ?, ?)
|
|
""", (user_id, entry_type, timestamp.isoformat(), False, comments))
|
|
else:
|
|
cursor.execute("""
|
|
INSERT INTO entries (user_id, entrytype, ts, paid)
|
|
VALUES (?, ?, ?, ?)
|
|
""", (user_id, entry_type, timestamp.isoformat(), False))
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
def generate_report(user_id, begin_date, end_date, client_timezone):
|
|
logging.error(f"user_id={user_id}, begin_date={begin_date}, end_date={end_date}")
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
begin_dt = make_timezone_aware(parse_iso_datetime(begin_date), client_timezone)
|
|
end_dt = make_timezone_aware(parse_iso_datetime(end_date), client_timezone)
|
|
logging.error(f"begin_dt={begin_dt}, end_dt={end_dt}")
|
|
cursor.execute("""
|
|
SELECT entrytype, ts, paid, comments
|
|
FROM entries
|
|
WHERE user_id = ?
|
|
AND datetime(ts) BETWEEN datetime(?) AND datetime(?)
|
|
ORDER BY ts ASC
|
|
""", (user_id, begin_dt.isoformat(), end_dt.isoformat()))
|
|
rows = cursor.fetchall()
|
|
conn.close()
|
|
total_seconds = 0
|
|
clock_in_time = None
|
|
paid_seconds = 0
|
|
actions = []
|
|
for row in rows:
|
|
timestamp = parse_iso_datetime(row["ts"])
|
|
if timestamp < begin_dt or timestamp > end_dt:
|
|
continue
|
|
if row["entrytype"] == "in":
|
|
clock_in_time = timestamp
|
|
elif row["entrytype"] == "out" and clock_in_time is not None:
|
|
delta = timestamp - clock_in_time
|
|
if row["paid"]:
|
|
paid_seconds += delta.total_seconds()
|
|
total_seconds += delta.total_seconds()
|
|
if row["comments"]:
|
|
if row["paid"]:
|
|
actions.append("(PAID) {}".format(row["comments"]))
|
|
else:
|
|
actions.append(row["comments"])
|
|
total_hours = total_seconds / 3600.0
|
|
paid_hours = paid_seconds / 3600.0
|
|
return round(total_hours, 2), round(paid_hours, 2), actions
|
|
|
|
@app.route("/", methods=["GET", "POST"])
|
|
def index():
|
|
report_hours = None
|
|
user_report = {}
|
|
all_user_reports = []
|
|
selected_user_id = None
|
|
now = est_now()
|
|
default_begin = now - timedelta(days=7)
|
|
default_end = now
|
|
conn = get_db_connection()
|
|
users = conn.execute("""
|
|
SELECT id, name
|
|
FROM users
|
|
ORDER BY name
|
|
""").fetchall()
|
|
conn.close()
|
|
if request.method == "POST":
|
|
selected_user_id = request.form.get("user_id")
|
|
action = request.form.get("action")
|
|
client_timestamp = request.form.get("client_timestamp")
|
|
client_timezone = request.form.get("client_timezone")
|
|
comments = request.form.get("comments")
|
|
if action == "clock_in":
|
|
create_entry(
|
|
selected_user_id,
|
|
"in",
|
|
client_timestamp,
|
|
comments,
|
|
client_timezone
|
|
)
|
|
elif action == "clock_out":
|
|
create_entry(
|
|
selected_user_id,
|
|
"out",
|
|
client_timestamp,
|
|
comments,
|
|
client_timezone
|
|
)
|
|
elif action == "pay":
|
|
begin_date = request.form.get("begin_date")
|
|
end_date = request.form.get("end_date")
|
|
pay_hours(
|
|
selected_user_id,
|
|
begin_date,
|
|
end_date,
|
|
client_timezone
|
|
)
|
|
elif action == "report":
|
|
begin_date = request.form.get("begin_date")
|
|
end_date = request.form.get("end_date")
|
|
client_timezone = request.form.get("client_timezone")
|
|
report_hours, paid_hours, actions = generate_report(
|
|
selected_user_id,
|
|
begin_date,
|
|
end_date,
|
|
client_timezone
|
|
)
|
|
user_report = {
|
|
"total_hours": report_hours,
|
|
"paid_hours": paid_hours,
|
|
"actions": actions
|
|
}
|
|
default_begin = parse_iso_datetime(begin_date)
|
|
default_end = parse_iso_datetime(end_date)
|
|
for user in users:
|
|
total_hours, paid_hours, actions = generate_report(
|
|
user["id"],
|
|
default_begin.isoformat(),
|
|
default_end.isoformat(),
|
|
request.form.get("client_timezone")
|
|
)
|
|
all_user_reports.append({
|
|
"id": user["id"],
|
|
"name": user["name"],
|
|
"total_hours": total_hours,
|
|
"paid_hours": paid_hours,
|
|
"actions": actions
|
|
})
|
|
conn = get_db_connection()
|
|
clocked_in_users = conn.execute("""
|
|
SELECT u.id, u.name, last_entry.ts
|
|
FROM users u
|
|
JOIN (
|
|
SELECT e.user_id, e.entrytype, e.ts
|
|
FROM entries e
|
|
INNER JOIN (
|
|
SELECT user_id, MAX(ts) AS max_ts
|
|
FROM entries
|
|
GROUP BY user_id
|
|
) latest
|
|
ON e.user_id = latest.user_id
|
|
AND e.ts = latest.max_ts
|
|
) last_entry
|
|
ON u.id = last_entry.user_id
|
|
WHERE last_entry.entrytype = 'in'
|
|
ORDER BY u.name
|
|
""").fetchall()
|
|
conn.close()
|
|
clocked_in_list = []
|
|
for user in clocked_in_users:
|
|
ts = parse_iso_datetime(user["ts"])
|
|
clocked_in_list.append({
|
|
"id": user["id"],
|
|
"name": user["name"],
|
|
"since": ts.isoformat()
|
|
})
|
|
return render_template(
|
|
"index.html",
|
|
users=users,
|
|
user_report=user_report,
|
|
all_user_reports=all_user_reports,
|
|
clocked_in_users=clocked_in_list,
|
|
default_begin=default_begin.isoformat(timespec="minutes"),
|
|
default_end=default_end.isoformat(timespec="minutes"),
|
|
selected_user_id=selected_user_id
|
|
)
|
|
|
|
if __name__ == "__main__":
|
|
initialize_database()
|
|
app.run(
|
|
host="0.0.0.0",
|
|
port=5000,
|
|
debug=False
|
|
)
|