Add pagerank algorithm

This commit is contained in:
Peter Vacho 2024-11-24 19:04:21 +01:00
parent e853747cdd
commit e7f0b5ce4e
Signed by: school
GPG key ID: 8CFC3837052871B4
2 changed files with 117 additions and 2 deletions

View file

@ -1,10 +1,12 @@
import asyncio
import re
from pprint import pprint
from time import perf_counter
import httpx
from src.link_scraper import get_urlmap, standard_urlmap_exception_suppressor
from src.pagerank import pagerank, test_pagerank
URL = httpx.URL("https://ailab.fai.utb.cz")
ALLOWED_HOSTS_RE = re.compile(r"(?:.*\.)?utb\.cz")
@ -12,17 +14,37 @@ ALLOWED_HOSTS_RE = re.compile(r"(?:.*\.)?utb\.cz")
async def main() -> None:
"""Program entrypoint."""
# Run a unit-test for the page-rank algorithm, according to the sample/test data
# in the assignment.
test_pagerank()
print("Scraping...")
start = perf_counter()
async with httpx.AsyncClient() as client:
url_map = await get_urlmap(
client,
URL,
max_depth=3,
max_depth=2,
filter_condition=lambda url: ALLOWED_HOSTS_RE.fullmatch(url.host) is not None,
suppress_exception=standard_urlmap_exception_suppressor,
)
pprint(url_map)
took = perf_counter() - start
print(f"Took: {round(took, 2)} seconds")
print("Ranking...")
start = perf_counter()
ranking = pagerank(url_map)
sorted_urls = list(ranking.keys())
sorted_urls.sort(key=lambda url: ranking[url])
took = perf_counter() - start
print(f"Took: {round(took, 2)} seconds")
print("Done")
pprint(sorted_urls)
if __name__ == "__main__":

93
src/pagerank.py Normal file
View file

@ -0,0 +1,93 @@
import numpy as np
def pagerank[T](
link_map: dict[T, set[T]],
beta: float = 0.85,
max_iter: int = 100,
tol: float = 1e-6,
) -> dict[T, float]:
"""Implementation of the PageRank algorithm based on the given procedure.
:param link_map: A map of links {source URL -> set of target URLs}.
:param beta: Probability of following existing links (damping factor).
:param max_iter: Maximum number of iterations for the algorithm.
:param tol: Tolerance for the difference between iterations (convergence threshold).
:return: A dictionary where the key is a URL and the value is its rank.
"""
# Get unique pages
pages = list(link_map.keys() | {link for links in link_map.values() for link in links})
n = len(pages)
page_indices = {page: i for i, page in enumerate(pages)} # Map pages to indices
# Build the adjacency matrix
m = np.zeros((n, n))
for src, targets in link_map.items():
if not targets:
# If the page has no outgoing links, assume it links to all pages (including itself)
m[:, page_indices[src]] = 1 / n
else:
# Calculate probabilities for all outgoing links from the given page
for target in targets:
m[page_indices[target], page_indices[src]] = 1 / len(targets)
# Create the E matrix
e = np.ones((n, n)) / n
# Create the A matrix
a = beta * m + (1 - beta) * e
# Initialize ranks (r(0))
ranks = np.ones(n) / n
# Iteratively calculate PageRank
for _ in range(max_iter):
new_ranks = a @ ranks # r(t+1) = A . r(t)
if np.linalg.norm(new_ranks - ranks, ord=1) < tol: # Convergence check
break
ranks = new_ranks
# Return ranks as {httpx.URL: rank}
return {page: ranks[idx] for page, idx in page_indices.items()}
def test_pagerank() -> None:
"""A simple function to test out the functionality of the pagerank algorithm.
This uses the provided sample numbers given in the task assignment.
"""
# The assignment only had the following as input, this test therefore also
# contains the logic to parse it out into a proper python dict
inp = "{1 -> 2, 1 -> 3, 2 -> 4, 3 -> 1, 3 -> 2, 3 -> 4, 4 -> 3}"
link_map: dict[int, set[int]] = {}
for pairing in inp.strip("{}").split(", "):
key, val = pairing.split(" -> ")
key = int(key)
val = int(val)
link_map.setdefault(key, set()).add(val)
# The results were shown in an image, I will just rewrite them here:
# (iterations count -> results)
results = {
0: [0.25, 0.25, 0.25, 0.25],
1: [0.10833333, 0.21458333, 0.35625, 0.32083333],
2: [0.1384375, 0.18447917, 0.35625, 0.32083333],
3: [0.1384375, 0.19727344, 0.36904427, 0.29524479],
4: [0.14206254, 0.20089848, 0.34729401, 0.30974497],
5: [0.13589997, 0.19627655, 0.3611598, 0.30666368],
}
# To how many digits should the results be verified
check_precision = 5
for it_count, expected in results.items():
# We're using tolerance of 0 to make sure we run for given amt of iterations exactly
actual = pagerank(link_map, max_iter=it_count, tol=0)
# convert to a list (use the format from the results)
actual = [actual[1], actual[2], actual[3], actual[4]]
for pos, (expected_num, actual_num) in enumerate(zip(expected, actual, strict=True)):
assert round(expected_num, check_precision) == round( # noqa: S101
actual_num, check_precision
), f"Invalid result for it={it_count} for {pos + 1}: {expected_num} != {actual_num}"